Add close method & supporting tests

This commit is contained in:
Brian M. Carlson 2014-10-30 18:38:44 -04:00
parent df63cbbab7
commit 6ab80d3995
4 changed files with 72 additions and 9 deletions

View File

@ -3,7 +3,9 @@ var Cursor = require('pg-cursor')
var Readable = require('readable-stream').Readable
var QueryStream = module.exports = function(text, values, options) {
var self = this;
var self = this
this._reading = false
this._closing = false
options = options || { }
Cursor.call(this, text, values)
Readable.call(this, {
@ -13,12 +15,15 @@ var QueryStream = module.exports = function(text, values, options) {
this.batchSize = options.batchSize || 100
this.once('end', function() {
process.nextTick(function() {
self.emit('close')
});
})
self.emit('close')
})
})
}
util.inherits(QueryStream, Readable)
//copy cursor prototype to QueryStream
//so we can handle all the events emitted by the connection
for(var key in Cursor.prototype) {
if(key == 'read') {
QueryStream.prototype._fetch = Cursor.prototype.read
@ -27,10 +32,19 @@ for(var key in Cursor.prototype) {
}
}
QueryStream.prototype.close = function() {
this._closing = true
var self = this
Cursor.prototype.close.call(this, function(err) {
if(err) return self.emit('error', err)
process.nextTick(function() {
self.push(null)
})
})
}
QueryStream.prototype._read = function(n) {
if(this._reading) return false;
if(this._reading || this._closing) return false
this._reading = true
var self = this
this._fetch(this.batchSize, function(err, rows) {
@ -41,7 +55,7 @@ QueryStream.prototype._read = function(n) {
process.nextTick(function() {
self.push(null)
})
return;
return
}
self._reading = false
for(var i = 0; i < rows.length; i++) {

View File

@ -31,7 +31,7 @@
"mocha": "~1.17.1"
},
"dependencies": {
"pg-cursor": "0.2.0",
"pg-cursor": "1.0.0",
"readable-stream": "^1.0.27-1"
}
}

View File

@ -4,8 +4,9 @@ var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var QueryStream = require('../')
var helper = require('./helper')
require('./helper')('close', function(client) {
helper('close', function(client) {
it('emits close', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
@ -13,3 +14,22 @@ require('./helper')('close', function(client) {
query.on('close', done)
})
})
helper('early close', function(client) {
it('can be closed early', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var readCount = 0
query.on('readable', function() {
readCount++
query.read()
})
query.once('readable', function() {
query.close()
})
query.on('close', function() {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})

29
test/slow-reader.js Normal file
View File

@ -0,0 +1,29 @@
var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')
var concat = require('concat-stream')
var Transform = require('stream').Transform
var mapper = new Transform({objectMode: true})
mapper._transform = function(obj, enc, cb) {
this.push(obj)
setTimeout(cb, 5)
}
helper('slow reader', function(client) {
it('works', function(done) {
this.timeout(50000)
var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {highWaterMark: 100, batchSize: 50})
stream.on('end', function() {
//console.log('stream end')
})
var query = client.query(stream)
var result = []
var count = 0
stream.pipe(mapper).pipe(concat(function(res) {
done()
}))
})
})