From e517b8ce143ef62800cfaf9720f0120b217d7914 Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Sat, 5 Aug 2017 18:27:29 -0500 Subject: [PATCH] WIP --- .travis.yml | 4 +- index.js | 118 +++++++++++++++----------------- package.json | 19 +++-- test/close.js | 46 ------------- test/helper.js | 2 +- test/issue-3.js | 2 +- test/mocha.opts | 1 + test/stream-tester-timestamp.js | 18 ++--- 8 files changed, 78 insertions(+), 132 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0e1916fb..9c09f74b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,7 @@ language: node_js node_js: - - "0.10" - - "0.12" - "4.2" - "6" - - "7" + - "8" env: - PGUSER=postgres PGDATABASE=postgres diff --git a/index.js b/index.js index e1965092..6665b6b0 100644 --- a/index.js +++ b/index.js @@ -1,69 +1,63 @@ +'use strict'; var util = require('util') var Cursor = require('pg-cursor') -var Readable = require('readable-stream').Readable +var Readable = require('stream').Readable -var QueryStream = module.exports = function(text, values, options) { - var self = this - this._reading = false - this._closing = false - options = options || { } - Cursor.call(this, text, values) - Readable.call(this, { - objectMode: true, - highWaterMark: options.highWaterMark || 1000 - }) - this.batchSize = options.batchSize || 100 - this.once('end', function() { - process.nextTick(function() { - self.emit('close') +class PgQueryStream extends Readable { + constructor(text, values, options) { + super(Object.assign({ objectMode: true }, options)) + this.cursor = new Cursor(text, values) + this._reading = false + this._closed = false + this.batchSize = (options || { }).batchSize || 100 + + // delegate Submittable callbacks to cursor + this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) + this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor) + this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor) + this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor) + this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor) + this.handleError = this.cursor.handleError.bind(this.cursor) + } + + submit(connection) { + this.cursor.submit(connection) + return this + } + + close(callback) { + this._closed = true + const cb = callback || (() => this.emit('close')) + this.cursor.close(cb) + } + + _read(size) { + if (this._reading || this._closed) { + return false + } + this._reading = true + const readAmount = Math.max(size, this.batchSize) + this.cursor.read(readAmount, (err, rows) => { + if (this._closed) { + return + } + if (err) { + return this.emit('error', err) + } + // if we get a 0 length array we've read to the end of the cursor + if (!rows.length) { + this._closed = true + setImmediate(() => this.emit('close')) + return this.push(null) + } + + // push each row into the stream + this._reading = false + for(var i = 0; i < rows.length; i++) { + this.push(rows[i]) + } }) - }) -} - -util.inherits(QueryStream, Readable) - -//copy cursor prototype to QueryStream -//so we can handle all the events emitted by the connection -for(var key in Cursor.prototype) { - if(key == 'read') { - QueryStream.prototype._fetch = Cursor.prototype.read - } else { - QueryStream.prototype[key] = Cursor.prototype[key] } } -QueryStream.prototype.close = function(cb) { - this._closing = true - var self = this - Cursor.prototype.close.call(this, function(err) { - if (cb) { cb(err); } - if(err) return self.emit('error', err) - process.nextTick(function() { - self.push(null) - }) - }) -} - -QueryStream.prototype._read = function(n) { - if(this._reading || this._closing) return false - this._reading = true - var self = this - this._fetch(this.batchSize, function(err, rows) { - if(err) { - return self.emit('error', err) - } - - if (self._closing) { return; } - - if(!rows.length) { - process.nextTick(function() { - self.push(null) - }) - return - } - self._reading = false - for(var i = 0; i < rows.length; i++) { - self.push(rows[i]) - } - }) -} +module.exports = PgQueryStream diff --git a/package.json b/package.json index 02f78231..5f0e0a3f 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "Postgres query result returned as readable stream", "main": "index.js", "scripts": { - "test": "mocha test/*.js -R spec" + "test": "mocha" }, "repository": { "type": "git", @@ -17,21 +17,20 @@ "stream" ], "author": "Brian M. Carlson", - "license": "BSD-2-Clause", + "license": "MIT", "bugs": { "url": "https://github.com/brianc/node-pg-query-stream/issues" }, "devDependencies": { - "pg.js": "*", - "concat-stream": "~1.0.1", - "through": "~2.3.4", - "stream-tester": "0.0.5", - "stream-spec": "~0.3.5", "JSONStream": "~0.7.1", - "mocha": "~1.17.1" + "concat-stream": "~1.0.1", + "mocha": "^3.5.0", + "pg": "6.x", + "stream-spec": "~0.3.5", + "stream-tester": "0.0.5", + "through": "~2.3.4" }, "dependencies": { - "pg-cursor": "1.0.0", - "readable-stream": "^2.0.4" + "pg-cursor": "1.2.1" } } diff --git a/test/close.js b/test/close.js index c0a30096..a8820c13 100644 --- a/test/close.js +++ b/test/close.js @@ -34,52 +34,6 @@ helper('early close', function(client) { }) }) -helper('should not throw errors after early close', function(client) { - it('can be closed early without error', function(done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, 2000) num'); - var query = client.query(stream); - var fetchCount = 0; - var errorCount = 0; - - - function waitForErrors() { - - setTimeout(function () { - assert(errorCount === 0, 'should not throw a ton of errors'); - done(); - }, 10); - } - - // hack internal _fetch function to force query.close immediately after _fetch is called (simulating the race condition) - // race condition: if close is called immediately after _fetch is called, but before results are returned, errors are thrown - // when the fetch results are pushed to the readable stream after its already closed. - query._fetch = (function (_fetch) { - return function () { - - // wait for the second fetch. closing immediately after the first fetch throws an entirely different error :( - if (fetchCount++ === 0) { - return _fetch.apply(this, arguments); - } - - var results = _fetch.apply(this, arguments); - - query.close(); - waitForErrors(); - - query._fetch = _fetch; // we're done with our hack, so restore the original _fetch function. - - return results; - } - }(query._fetch)); - - query.on('error', function () { errorCount++; }); - - query.on('readable', function () { - query.read(); - }); - }); -}); - helper('close callback', function (client) { it('notifies an optional callback when the conneciton is closed', function (done) { var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2}); diff --git a/test/helper.js b/test/helper.js index ce04d127..f4e42720 100644 --- a/test/helper.js +++ b/test/helper.js @@ -1,4 +1,4 @@ -var pg = require('pg.js') +var pg = require('pg') module.exports = function(name, cb) { describe(name, function() { var client = new pg.Client() diff --git a/test/issue-3.js b/test/issue-3.js index 0c822c97..302927d2 100644 --- a/test/issue-3.js +++ b/test/issue-3.js @@ -1,4 +1,4 @@ -var pg = require('pg.js') +var pg = require('pg') var QueryStream = require('../') describe('end semantics race condition', function() { before(function(done) { diff --git a/test/mocha.opts b/test/mocha.opts index 8dcb4d8d..46e8e69d 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -1 +1,2 @@ --no-exit +--bail diff --git a/test/stream-tester-timestamp.js b/test/stream-tester-timestamp.js index d62dd566..c9c2b212 100644 --- a/test/stream-tester-timestamp.js +++ b/test/stream-tester-timestamp.js @@ -1,4 +1,4 @@ -var pg = require('pg.js') +var pg = require('pg') var QueryStream = require('../') var spec = require('stream-spec') var assert = require('assert') @@ -10,20 +10,20 @@ require('./helper')('stream tester timestamp', function(client) { var stream = new QueryStream(sql, []) var ended = false var query = client.query(stream) - query. - on('end', function() { ended = true }) + query.on('end', function() { ended = true }) spec(query) .readable() - .pausable({strict: true}) - .validateOnExit() - ; + .pausable({ strict: true }) + .validateOnExit(); var checkListeners = function() { assert(stream.listeners('end').length < 10) - if (!ended) + if (!ended) { setImmediate(checkListeners) - else + } + else { done() + } } checkListeners() }) -}) \ No newline at end of file +})