This commit is contained in:
Brian M. Carlson 2017-08-06 11:59:47 -05:00
parent e517b8ce14
commit b1f8f8d60d
15 changed files with 1413 additions and 94 deletions

9
.eslintrc Normal file
View File

@ -0,0 +1,9 @@
{
"extends": "standard",
"env": {
"mocha": true
},
"rules": {
"no-new-func": "off"
}
}

View File

@ -1,10 +1,9 @@
'use strict';
var util = require('util')
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
class PgQueryStream extends Readable {
constructor(text, values, options) {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values)
this._reading = false
@ -20,18 +19,18 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}
submit(connection) {
submit (connection) {
this.cursor.submit(connection)
return this
}
close(callback) {
close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
}
_read(size) {
_read (size) {
if (this._reading || this._closed) {
return false
}
@ -53,7 +52,7 @@ class PgQueryStream extends Readable {
// push each row into the stream
this._reading = false
for(var i = 0; i < rows.length; i++) {
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
}
})

View File

@ -24,6 +24,8 @@
"devDependencies": {
"JSONStream": "~0.7.1",
"concat-stream": "~1.0.1",
"eslint": "^4.4.0",
"eslint-config-standard": "^10.2.1",
"mocha": "^3.5.0",
"pg": "6.x",
"stream-spec": "~0.3.5",
@ -31,6 +33,10 @@
"through": "~2.3.4"
},
"dependencies": {
"pg-cursor": "1.2.1"
"eslint-plugin-import": "^2.7.0",
"eslint-plugin-node": "^5.1.1",
"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^3.0.1",
"pg-cursor": "1.3.0"
}
}

View File

@ -1,33 +1,31 @@
var assert = require('assert')
var concat = require('concat-stream')
var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var QueryStream = require('../')
var helper = require('./helper')
helper('close', function(client) {
it('emits close', function(done) {
helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.pipe(concat(function() {}))
query.pipe(concat(function () {}))
query.on('close', done)
})
})
helper('early close', function(client) {
it('can be closed early', function(done) {
helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var readCount = 0
query.on('readable', function() {
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function() {
query.once('readable', function () {
query.close()
})
query.on('close', function() {
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
@ -36,19 +34,19 @@ helper('early close', function(client) {
helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2});
var query = client.query(stream);
query.once('readable', function() { // only reading once
query.read();
});
query.once('readable', function() {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done();
});
});
done()
})
})
query.on('close', function () {
assert(false, "close event should not fire"); // no close event because we did not read to the end of the stream.
});
});
});
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})
})

View File

@ -5,14 +5,14 @@ var helper = require('./helper')
var QueryStream = require('../')
helper('concat', function(client) {
it('concats correctly', function(done) {
helper('concat', function (client) {
it('concats correctly', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
query.pipe(through(function(row) {
query.pipe(through(function (row) {
this.push(row.num)
})).pipe(concat(function(result) {
var total = result.reduce(function(prev, cur) {
})).pipe(concat(function (result) {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)

View File

@ -3,20 +3,20 @@ var helper = require('./helper')
var QueryStream = require('../')
helper('error', function(client) {
it('receives error on stream', function(done) {
helper('error', function (client) {
it('receives error on stream', function (done) {
var stream = new QueryStream('SELECT * FROM asdf num', [])
var query = client.query(stream)
query.on('error', function(err) {
query.on('error', function (err) {
assert(err)
assert.equal(err.code, '42P01')
done()
}).on('data', function () {
//noop to kick of reading
// noop to kick of reading
})
})
it('continues to function after stream', function(done) {
it('continues to function after stream', function (done) {
client.query('SELECT NOW()', done)
})
})

View File

@ -2,27 +2,26 @@ var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')
helper('fast reader', function(client) {
it('works', function(done) {
helper('fast reader', function (client) {
it('works', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
var result = []
var count = 0
stream.on('readable', function() {
stream.on('readable', function () {
var res = stream.read()
if (result.length !== 201) {
assert(res, 'should not return null on evented reader')
} else {
//a readable stream will emit a null datum when it finishes being readable
//https://nodejs.org/api/stream.html#stream_event_readable
// a readable stream will emit a null datum when it finishes being readable
// https://nodejs.org/api/stream.html#stream_event_readable
assert.equal(res, null)
}
if(res) {
if (res) {
result.push(res.num)
}
})
stream.on('end', function() {
var total = result.reduce(function(prev, cur) {
stream.on('end', function () {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)

View File

@ -1,15 +1,15 @@
var pg = require('pg')
module.exports = function(name, cb) {
describe(name, function() {
module.exports = function (name, cb) {
describe(name, function () {
var client = new pg.Client()
before(function(done) {
before(function (done) {
client.connect(done)
})
cb(client)
after(function(done) {
after(function (done) {
client.end()
client.on('end', done)
})

View File

@ -3,11 +3,11 @@ var concat = require('concat-stream')
var QueryStream = require('../')
require('./helper')('instant', function(client) {
it('instant', function(done) {
require('./helper')('instant', function (client) {
it('instant', function (done) {
var query = new QueryStream('SELECT pg_sleep(1)', [])
var stream = client.query(query)
stream.pipe(concat(function(res) {
stream.pipe(concat(function (res) {
assert.equal(res.length, 1)
done()
}))

View File

@ -1,7 +1,7 @@
var pg = require('pg')
var QueryStream = require('../')
describe('end semantics race condition', function() {
before(function(done) {
describe('end semantics race condition', function () {
before(function (done) {
var client = new pg.Client()
client.connect()
client.on('drain', client.end.bind(client))
@ -9,20 +9,20 @@ describe('end semantics race condition', function() {
client.query('create table IF NOT EXISTS p(id serial primary key)')
client.query('create table IF NOT EXISTS c(id int primary key references p)')
})
it('works', function(done) {
it('works', function (done) {
var client1 = new pg.Client()
client1.connect()
var client2 = new pg.Client()
client2.connect()
var qr = new QueryStream("INSERT INTO p DEFAULT VALUES RETURNING id")
var qr = new QueryStream('INSERT INTO p DEFAULT VALUES RETURNING id')
client1.query(qr)
var id = null
qr.on('data', function(row) {
qr.on('data', function (row) {
id = row.id
})
qr.on('end', function () {
client2.query("INSERT INTO c(id) VALUES ($1)", [id], function (err, rows) {
client2.query('INSERT INTO c(id) VALUES ($1)', [id], function (err, rows) {
client1.end()
client2.end()
done(err)

View File

@ -1,16 +1,16 @@
var assert = require('assert')
var concat = require('concat-stream')
var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var QueryStream = require('../')
require('./helper')('pauses', function(client) {
it('pauses', function(done) {
require('./helper')('pauses', function (client) {
it('pauses', function (done) {
this.timeout(5000)
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var pauser = tester.createPauseStream(0.1, 100)
query.pipe(JSONStream.stringify()).pipe(concat(function(json) {
query.pipe(JSONStream.stringify()).pipe(pauser).pipe(concat(function (json) {
JSON.parse(json)
done()
}))

View File

@ -1,4 +1,3 @@
var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')
var concat = require('concat-stream')
@ -7,22 +6,20 @@ var Transform = require('stream').Transform
var mapper = new Transform({objectMode: true})
mapper._transform = function(obj, enc, cb) {
this.push(obj)
setTimeout(cb, 5)
mapper._transform = function (obj, enc, cb) {
this.push(obj)
setTimeout(cb, 5)
}
helper('slow reader', function(client) {
it('works', function(done) {
helper('slow reader', function (client) {
it('works', function (done) {
this.timeout(50000)
var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {highWaterMark: 100, batchSize: 50})
stream.on('end', function() {
//console.log('stream end')
stream.on('end', function () {
// console.log('stream end')
})
var query = client.query(stream)
var result = []
var count = 0
stream.pipe(mapper).pipe(concat(function(res) {
client.query(stream)
stream.pipe(mapper).pipe(concat(function (res) {
done()
}))
})

View File

@ -1,26 +1,23 @@
var pg = require('pg')
var QueryStream = require('../')
var spec = require('stream-spec')
var assert = require('assert')
require('./helper')('stream tester timestamp', function(client) {
it('should not warn about max listeners', function(done) {
require('./helper')('stream tester timestamp', function (client) {
it('should not warn about max listeners', function (done) {
var sql = 'SELECT * FROM generate_series(\'1983-12-30 00:00\'::timestamp, \'2013-12-30 00:00\', \'1 years\')'
var result = []
var stream = new QueryStream(sql, [])
var ended = false
var query = client.query(stream)
query.on('end', function() { ended = true })
query.on('end', function () { ended = true })
spec(query)
.readable()
.pausable({ strict: true })
.validateOnExit();
var checkListeners = function() {
.validateOnExit()
var checkListeners = function () {
assert(stream.listeners('end').length < 10)
if (!ended) {
setImmediate(checkListeners)
}
else {
} else {
done()
}
}

View File

@ -1,16 +1,15 @@
var tester = require('stream-tester')
var spec = require('stream-spec')
var QueryStream = require('../')
require('./helper')('stream tester', function(client) {
it('passes stream spec', function(done) {
require('./helper')('stream tester', function (client) {
it('passes stream spec', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
spec(query)
.readable()
.pausable({strict: true})
.validateOnExit()
.readable()
.pausable({strict: true})
.validateOnExit()
stream.on('end', done)
})
})

1315
yarn.lock Normal file

File diff suppressed because it is too large Load Diff