Merge pull request #29 from brianc/upgrade-pg-cursor

Upgrade to newest version of pg-cursor
This commit is contained in:
Brian C 2017-08-06 12:41:34 -05:00 committed by GitHub
commit 57f62df315
17 changed files with 1478 additions and 213 deletions

9
.eslintrc Normal file
View File

@ -0,0 +1,9 @@
{
"extends": "standard",
"env": {
"mocha": true
},
"rules": {
"no-new-func": "off"
}
}

View File

@ -1,9 +1,7 @@
language: node_js
node_js:
- "0.10"
- "0.12"
- "4.2"
- "6"
- "7"
- "8"
env:
- PGUSER=postgres PGDATABASE=postgres

119
index.js
View File

@ -1,69 +1,62 @@
var util = require('util')
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('readable-stream').Readable
var Readable = require('stream').Readable
var QueryStream = module.exports = function(text, values, options) {
var self = this
this._reading = false
this._closing = false
options = options || { }
Cursor.call(this, text, values)
Readable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 1000
})
this.batchSize = options.batchSize || 100
this.once('end', function() {
process.nextTick(function() {
self.emit('close')
class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values)
this._reading = false
this._closed = false
this.batchSize = (options || { }).batchSize || 100
// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
this.handleError = this.cursor.handleError.bind(this.cursor)
}
submit (connection) {
this.cursor.submit(connection)
return this
}
close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
}
_read (size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}
// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
}
})
})
}
util.inherits(QueryStream, Readable)
//copy cursor prototype to QueryStream
//so we can handle all the events emitted by the connection
for(var key in Cursor.prototype) {
if(key == 'read') {
QueryStream.prototype._fetch = Cursor.prototype.read
} else {
QueryStream.prototype[key] = Cursor.prototype[key]
}
}
QueryStream.prototype.close = function(cb) {
this._closing = true
var self = this
Cursor.prototype.close.call(this, function(err) {
if (cb) { cb(err); }
if(err) return self.emit('error', err)
process.nextTick(function() {
self.push(null)
})
})
}
QueryStream.prototype._read = function(n) {
if(this._reading || this._closing) return false
this._reading = true
var self = this
this._fetch(this.batchSize, function(err, rows) {
if(err) {
return self.emit('error', err)
}
if (self._closing) { return; }
if(!rows.length) {
process.nextTick(function() {
self.push(null)
})
return
}
self._reading = false
for(var i = 0; i < rows.length; i++) {
self.push(rows[i])
}
})
}
module.exports = PgQueryStream

View File

@ -4,7 +4,7 @@
"description": "Postgres query result returned as readable stream",
"main": "index.js",
"scripts": {
"test": "mocha test/*.js -R spec"
"test": "mocha"
},
"repository": {
"type": "git",
@ -17,21 +17,26 @@
"stream"
],
"author": "Brian M. Carlson",
"license": "BSD-2-Clause",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-pg-query-stream/issues"
},
"devDependencies": {
"pg.js": "*",
"concat-stream": "~1.0.1",
"through": "~2.3.4",
"stream-tester": "0.0.5",
"stream-spec": "~0.3.5",
"JSONStream": "~0.7.1",
"mocha": "~1.17.1"
"concat-stream": "~1.0.1",
"eslint": "^4.4.0",
"eslint-config-standard": "^10.2.1",
"mocha": "^3.5.0",
"pg": "6.x",
"stream-spec": "~0.3.5",
"stream-tester": "0.0.5",
"through": "~2.3.4"
},
"dependencies": {
"pg-cursor": "1.0.0",
"readable-stream": "^2.0.4"
"eslint-plugin-import": "^2.7.0",
"eslint-plugin-node": "^5.1.1",
"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^3.0.1",
"pg-cursor": "1.3.0"
}
}

View File

@ -1,100 +1,52 @@
var assert = require('assert')
var concat = require('concat-stream')
var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var QueryStream = require('../')
var helper = require('./helper')
helper('close', function(client) {
it('emits close', function(done) {
helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.pipe(concat(function() {}))
query.pipe(concat(function () {}))
query.on('close', done)
})
})
helper('early close', function(client) {
it('can be closed early', function(done) {
helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var readCount = 0
query.on('readable', function() {
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function() {
query.once('readable', function () {
query.close()
})
query.on('close', function() {
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})
helper('should not throw errors after early close', function(client) {
it('can be closed early without error', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 2000) num');
var query = client.query(stream);
var fetchCount = 0;
var errorCount = 0;
function waitForErrors() {
setTimeout(function () {
assert(errorCount === 0, 'should not throw a ton of errors');
done();
}, 10);
}
// hack internal _fetch function to force query.close immediately after _fetch is called (simulating the race condition)
// race condition: if close is called immediately after _fetch is called, but before results are returned, errors are thrown
// when the fetch results are pushed to the readable stream after its already closed.
query._fetch = (function (_fetch) {
return function () {
// wait for the second fetch. closing immediately after the first fetch throws an entirely different error :(
if (fetchCount++ === 0) {
return _fetch.apply(this, arguments);
}
var results = _fetch.apply(this, arguments);
query.close();
waitForErrors();
query._fetch = _fetch; // we're done with our hack, so restore the original _fetch function.
return results;
}
}(query._fetch));
query.on('error', function () { errorCount++; });
query.on('readable', function () {
query.read();
});
});
});
helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2});
var query = client.query(stream);
query.once('readable', function() { // only reading once
query.read();
});
query.once('readable', function() {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done();
});
});
done()
})
})
query.on('close', function () {
assert(false, "close event should not fire"); // no close event because we did not read to the end of the stream.
});
});
});
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})
})

View File

@ -5,14 +5,14 @@ var helper = require('./helper')
var QueryStream = require('../')
helper('concat', function(client) {
it('concats correctly', function(done) {
helper('concat', function (client) {
it('concats correctly', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
query.pipe(through(function(row) {
query.pipe(through(function (row) {
this.push(row.num)
})).pipe(concat(function(result) {
var total = result.reduce(function(prev, cur) {
})).pipe(concat(function (result) {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)

View File

@ -3,20 +3,20 @@ var helper = require('./helper')
var QueryStream = require('../')
helper('error', function(client) {
it('receives error on stream', function(done) {
helper('error', function (client) {
it('receives error on stream', function (done) {
var stream = new QueryStream('SELECT * FROM asdf num', [])
var query = client.query(stream)
query.on('error', function(err) {
query.on('error', function (err) {
assert(err)
assert.equal(err.code, '42P01')
done()
}).on('data', function () {
//noop to kick of reading
// noop to kick of reading
})
})
it('continues to function after stream', function(done) {
it('continues to function after stream', function (done) {
client.query('SELECT NOW()', done)
})
})

View File

@ -2,27 +2,26 @@ var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')
helper('fast reader', function(client) {
it('works', function(done) {
helper('fast reader', function (client) {
it('works', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
var result = []
var count = 0
stream.on('readable', function() {
stream.on('readable', function () {
var res = stream.read()
if (result.length !== 201) {
assert(res, 'should not return null on evented reader')
} else {
//a readable stream will emit a null datum when it finishes being readable
//https://nodejs.org/api/stream.html#stream_event_readable
// a readable stream will emit a null datum when it finishes being readable
// https://nodejs.org/api/stream.html#stream_event_readable
assert.equal(res, null)
}
if(res) {
if (res) {
result.push(res.num)
}
})
stream.on('end', function() {
var total = result.reduce(function(prev, cur) {
stream.on('end', function () {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)

View File

@ -1,15 +1,15 @@
var pg = require('pg.js')
module.exports = function(name, cb) {
describe(name, function() {
var pg = require('pg')
module.exports = function (name, cb) {
describe(name, function () {
var client = new pg.Client()
before(function(done) {
before(function (done) {
client.connect(done)
})
cb(client)
after(function(done) {
after(function (done) {
client.end()
client.on('end', done)
})

View File

@ -3,11 +3,11 @@ var concat = require('concat-stream')
var QueryStream = require('../')
require('./helper')('instant', function(client) {
it('instant', function(done) {
require('./helper')('instant', function (client) {
it('instant', function (done) {
var query = new QueryStream('SELECT pg_sleep(1)', [])
var stream = client.query(query)
stream.pipe(concat(function(res) {
stream.pipe(concat(function (res) {
assert.equal(res.length, 1)
done()
}))

View File

@ -1,7 +1,7 @@
var pg = require('pg.js')
var pg = require('pg')
var QueryStream = require('../')
describe('end semantics race condition', function() {
before(function(done) {
describe('end semantics race condition', function () {
before(function (done) {
var client = new pg.Client()
client.connect()
client.on('drain', client.end.bind(client))
@ -9,20 +9,20 @@ describe('end semantics race condition', function() {
client.query('create table IF NOT EXISTS p(id serial primary key)')
client.query('create table IF NOT EXISTS c(id int primary key references p)')
})
it('works', function(done) {
it('works', function (done) {
var client1 = new pg.Client()
client1.connect()
var client2 = new pg.Client()
client2.connect()
var qr = new QueryStream("INSERT INTO p DEFAULT VALUES RETURNING id")
var qr = new QueryStream('INSERT INTO p DEFAULT VALUES RETURNING id')
client1.query(qr)
var id = null
qr.on('data', function(row) {
qr.on('data', function (row) {
id = row.id
})
qr.on('end', function () {
client2.query("INSERT INTO c(id) VALUES ($1)", [id], function (err, rows) {
client2.query('INSERT INTO c(id) VALUES ($1)', [id], function (err, rows) {
client1.end()
client2.end()
done(err)

View File

@ -1 +1,2 @@
--no-exit
--bail

View File

@ -1,16 +1,16 @@
var assert = require('assert')
var concat = require('concat-stream')
var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var QueryStream = require('../')
require('./helper')('pauses', function(client) {
it('pauses', function(done) {
require('./helper')('pauses', function (client) {
it('pauses', function (done) {
this.timeout(5000)
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var pauser = tester.createPauseStream(0.1, 100)
query.pipe(JSONStream.stringify()).pipe(concat(function(json) {
query.pipe(JSONStream.stringify()).pipe(pauser).pipe(concat(function (json) {
JSON.parse(json)
done()
}))

View File

@ -1,4 +1,3 @@
var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')
var concat = require('concat-stream')
@ -7,22 +6,20 @@ var Transform = require('stream').Transform
var mapper = new Transform({objectMode: true})
mapper._transform = function(obj, enc, cb) {
this.push(obj)
setTimeout(cb, 5)
mapper._transform = function (obj, enc, cb) {
this.push(obj)
setTimeout(cb, 5)
}
helper('slow reader', function(client) {
it('works', function(done) {
helper('slow reader', function (client) {
it('works', function (done) {
this.timeout(50000)
var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {highWaterMark: 100, batchSize: 50})
stream.on('end', function() {
//console.log('stream end')
stream.on('end', function () {
// console.log('stream end')
})
var query = client.query(stream)
var result = []
var count = 0
stream.pipe(mapper).pipe(concat(function(res) {
client.query(stream)
stream.pipe(mapper).pipe(concat(function (res) {
done()
}))
})

View File

@ -1,29 +1,26 @@
var pg = require('pg.js')
var QueryStream = require('../')
var spec = require('stream-spec')
var assert = require('assert')
require('./helper')('stream tester timestamp', function(client) {
it('should not warn about max listeners', function(done) {
require('./helper')('stream tester timestamp', function (client) {
it('should not warn about max listeners', function (done) {
var sql = 'SELECT * FROM generate_series(\'1983-12-30 00:00\'::timestamp, \'2013-12-30 00:00\', \'1 years\')'
var result = []
var stream = new QueryStream(sql, [])
var ended = false
var query = client.query(stream)
query.
on('end', function() { ended = true })
query.on('end', function () { ended = true })
spec(query)
.readable()
.pausable({strict: true})
.pausable({ strict: true })
.validateOnExit()
;
var checkListeners = function() {
var checkListeners = function () {
assert(stream.listeners('end').length < 10)
if (!ended)
if (!ended) {
setImmediate(checkListeners)
else
} else {
done()
}
}
checkListeners()
})
})
})

View File

@ -1,16 +1,15 @@
var tester = require('stream-tester')
var spec = require('stream-spec')
var QueryStream = require('../')
require('./helper')('stream tester', function(client) {
it('passes stream spec', function(done) {
require('./helper')('stream tester', function (client) {
it('passes stream spec', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
spec(query)
.readable()
.pausable({strict: true})
.validateOnExit()
.readable()
.pausable({strict: true})
.validateOnExit()
stream.on('end', done)
})
})

1315
yarn.lock Normal file

File diff suppressed because it is too large Load Diff