mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
WIP
This commit is contained in:
parent
c9e21f4161
commit
e517b8ce14
@ -1,9 +1,7 @@
|
||||
language: node_js
|
||||
node_js:
|
||||
- "0.10"
|
||||
- "0.12"
|
||||
- "4.2"
|
||||
- "6"
|
||||
- "7"
|
||||
- "8"
|
||||
env:
|
||||
- PGUSER=postgres PGDATABASE=postgres
|
||||
|
||||
118
index.js
118
index.js
@ -1,69 +1,63 @@
|
||||
'use strict';
|
||||
var util = require('util')
|
||||
var Cursor = require('pg-cursor')
|
||||
var Readable = require('readable-stream').Readable
|
||||
var Readable = require('stream').Readable
|
||||
|
||||
var QueryStream = module.exports = function(text, values, options) {
|
||||
var self = this
|
||||
this._reading = false
|
||||
this._closing = false
|
||||
options = options || { }
|
||||
Cursor.call(this, text, values)
|
||||
Readable.call(this, {
|
||||
objectMode: true,
|
||||
highWaterMark: options.highWaterMark || 1000
|
||||
})
|
||||
this.batchSize = options.batchSize || 100
|
||||
this.once('end', function() {
|
||||
process.nextTick(function() {
|
||||
self.emit('close')
|
||||
class PgQueryStream extends Readable {
|
||||
constructor(text, values, options) {
|
||||
super(Object.assign({ objectMode: true }, options))
|
||||
this.cursor = new Cursor(text, values)
|
||||
this._reading = false
|
||||
this._closed = false
|
||||
this.batchSize = (options || { }).batchSize || 100
|
||||
|
||||
// delegate Submittable callbacks to cursor
|
||||
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
|
||||
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
|
||||
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
|
||||
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
|
||||
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
|
||||
this.handleError = this.cursor.handleError.bind(this.cursor)
|
||||
}
|
||||
|
||||
submit(connection) {
|
||||
this.cursor.submit(connection)
|
||||
return this
|
||||
}
|
||||
|
||||
close(callback) {
|
||||
this._closed = true
|
||||
const cb = callback || (() => this.emit('close'))
|
||||
this.cursor.close(cb)
|
||||
}
|
||||
|
||||
_read(size) {
|
||||
if (this._reading || this._closed) {
|
||||
return false
|
||||
}
|
||||
this._reading = true
|
||||
const readAmount = Math.max(size, this.batchSize)
|
||||
this.cursor.read(readAmount, (err, rows) => {
|
||||
if (this._closed) {
|
||||
return
|
||||
}
|
||||
if (err) {
|
||||
return this.emit('error', err)
|
||||
}
|
||||
// if we get a 0 length array we've read to the end of the cursor
|
||||
if (!rows.length) {
|
||||
this._closed = true
|
||||
setImmediate(() => this.emit('close'))
|
||||
return this.push(null)
|
||||
}
|
||||
|
||||
// push each row into the stream
|
||||
this._reading = false
|
||||
for(var i = 0; i < rows.length; i++) {
|
||||
this.push(rows[i])
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
util.inherits(QueryStream, Readable)
|
||||
|
||||
//copy cursor prototype to QueryStream
|
||||
//so we can handle all the events emitted by the connection
|
||||
for(var key in Cursor.prototype) {
|
||||
if(key == 'read') {
|
||||
QueryStream.prototype._fetch = Cursor.prototype.read
|
||||
} else {
|
||||
QueryStream.prototype[key] = Cursor.prototype[key]
|
||||
}
|
||||
}
|
||||
|
||||
QueryStream.prototype.close = function(cb) {
|
||||
this._closing = true
|
||||
var self = this
|
||||
Cursor.prototype.close.call(this, function(err) {
|
||||
if (cb) { cb(err); }
|
||||
if(err) return self.emit('error', err)
|
||||
process.nextTick(function() {
|
||||
self.push(null)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
QueryStream.prototype._read = function(n) {
|
||||
if(this._reading || this._closing) return false
|
||||
this._reading = true
|
||||
var self = this
|
||||
this._fetch(this.batchSize, function(err, rows) {
|
||||
if(err) {
|
||||
return self.emit('error', err)
|
||||
}
|
||||
|
||||
if (self._closing) { return; }
|
||||
|
||||
if(!rows.length) {
|
||||
process.nextTick(function() {
|
||||
self.push(null)
|
||||
})
|
||||
return
|
||||
}
|
||||
self._reading = false
|
||||
for(var i = 0; i < rows.length; i++) {
|
||||
self.push(rows[i])
|
||||
}
|
||||
})
|
||||
}
|
||||
module.exports = PgQueryStream
|
||||
|
||||
19
package.json
19
package.json
@ -4,7 +4,7 @@
|
||||
"description": "Postgres query result returned as readable stream",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "mocha test/*.js -R spec"
|
||||
"test": "mocha"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@ -17,21 +17,20 @@
|
||||
"stream"
|
||||
],
|
||||
"author": "Brian M. Carlson",
|
||||
"license": "BSD-2-Clause",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://github.com/brianc/node-pg-query-stream/issues"
|
||||
},
|
||||
"devDependencies": {
|
||||
"pg.js": "*",
|
||||
"concat-stream": "~1.0.1",
|
||||
"through": "~2.3.4",
|
||||
"stream-tester": "0.0.5",
|
||||
"stream-spec": "~0.3.5",
|
||||
"JSONStream": "~0.7.1",
|
||||
"mocha": "~1.17.1"
|
||||
"concat-stream": "~1.0.1",
|
||||
"mocha": "^3.5.0",
|
||||
"pg": "6.x",
|
||||
"stream-spec": "~0.3.5",
|
||||
"stream-tester": "0.0.5",
|
||||
"through": "~2.3.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"pg-cursor": "1.0.0",
|
||||
"readable-stream": "^2.0.4"
|
||||
"pg-cursor": "1.2.1"
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,52 +34,6 @@ helper('early close', function(client) {
|
||||
})
|
||||
})
|
||||
|
||||
helper('should not throw errors after early close', function(client) {
|
||||
it('can be closed early without error', function(done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 2000) num');
|
||||
var query = client.query(stream);
|
||||
var fetchCount = 0;
|
||||
var errorCount = 0;
|
||||
|
||||
|
||||
function waitForErrors() {
|
||||
|
||||
setTimeout(function () {
|
||||
assert(errorCount === 0, 'should not throw a ton of errors');
|
||||
done();
|
||||
}, 10);
|
||||
}
|
||||
|
||||
// hack internal _fetch function to force query.close immediately after _fetch is called (simulating the race condition)
|
||||
// race condition: if close is called immediately after _fetch is called, but before results are returned, errors are thrown
|
||||
// when the fetch results are pushed to the readable stream after its already closed.
|
||||
query._fetch = (function (_fetch) {
|
||||
return function () {
|
||||
|
||||
// wait for the second fetch. closing immediately after the first fetch throws an entirely different error :(
|
||||
if (fetchCount++ === 0) {
|
||||
return _fetch.apply(this, arguments);
|
||||
}
|
||||
|
||||
var results = _fetch.apply(this, arguments);
|
||||
|
||||
query.close();
|
||||
waitForErrors();
|
||||
|
||||
query._fetch = _fetch; // we're done with our hack, so restore the original _fetch function.
|
||||
|
||||
return results;
|
||||
}
|
||||
}(query._fetch));
|
||||
|
||||
query.on('error', function () { errorCount++; });
|
||||
|
||||
query.on('readable', function () {
|
||||
query.read();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
helper('close callback', function (client) {
|
||||
it('notifies an optional callback when the conneciton is closed', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2});
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
module.exports = function(name, cb) {
|
||||
describe(name, function() {
|
||||
var client = new pg.Client()
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
var QueryStream = require('../')
|
||||
describe('end semantics race condition', function() {
|
||||
before(function(done) {
|
||||
|
||||
@ -1 +1,2 @@
|
||||
--no-exit
|
||||
--bail
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
var QueryStream = require('../')
|
||||
var spec = require('stream-spec')
|
||||
var assert = require('assert')
|
||||
@ -10,20 +10,20 @@ require('./helper')('stream tester timestamp', function(client) {
|
||||
var stream = new QueryStream(sql, [])
|
||||
var ended = false
|
||||
var query = client.query(stream)
|
||||
query.
|
||||
on('end', function() { ended = true })
|
||||
query.on('end', function() { ended = true })
|
||||
spec(query)
|
||||
.readable()
|
||||
.pausable({strict: true})
|
||||
.validateOnExit()
|
||||
;
|
||||
.pausable({ strict: true })
|
||||
.validateOnExit();
|
||||
var checkListeners = function() {
|
||||
assert(stream.listeners('end').length < 10)
|
||||
if (!ended)
|
||||
if (!ended) {
|
||||
setImmediate(checkListeners)
|
||||
else
|
||||
}
|
||||
else {
|
||||
done()
|
||||
}
|
||||
}
|
||||
checkListeners()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user