From 6ab80d3995711435f63613bdfd5f241f0d94f823 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Thu, 30 Oct 2014 18:38:44 -0400 Subject: [PATCH] Add close method & supporting tests --- index.js | 28 +++++++++++++++++++++------- package.json | 2 +- test/close.js | 22 +++++++++++++++++++++- test/slow-reader.js | 29 +++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 test/slow-reader.js diff --git a/index.js b/index.js index e346457b..a5fc2da3 100644 --- a/index.js +++ b/index.js @@ -3,7 +3,9 @@ var Cursor = require('pg-cursor') var Readable = require('readable-stream').Readable var QueryStream = module.exports = function(text, values, options) { - var self = this; + var self = this + this._reading = false + this._closing = false options = options || { } Cursor.call(this, text, values) Readable.call(this, { @@ -13,12 +15,15 @@ var QueryStream = module.exports = function(text, values, options) { this.batchSize = options.batchSize || 100 this.once('end', function() { process.nextTick(function() { - self.emit('close') - }); - }) + self.emit('close') + }) + }) } util.inherits(QueryStream, Readable) + +//copy cursor prototype to QueryStream +//so we can handle all the events emitted by the connection for(var key in Cursor.prototype) { if(key == 'read') { QueryStream.prototype._fetch = Cursor.prototype.read @@ -27,10 +32,19 @@ for(var key in Cursor.prototype) { } } - +QueryStream.prototype.close = function() { + this._closing = true + var self = this + Cursor.prototype.close.call(this, function(err) { + if(err) return self.emit('error', err) + process.nextTick(function() { + self.push(null) + }) + }) +} QueryStream.prototype._read = function(n) { - if(this._reading) return false; + if(this._reading || this._closing) return false this._reading = true var self = this this._fetch(this.batchSize, function(err, rows) { @@ -41,7 +55,7 @@ QueryStream.prototype._read = function(n) { process.nextTick(function() { self.push(null) }) - return; + return } self._reading = false for(var i = 0; i < rows.length; i++) { diff --git a/package.json b/package.json index 1ac30f4c..1f9d0744 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "mocha": "~1.17.1" }, "dependencies": { - "pg-cursor": "0.2.0", + "pg-cursor": "1.0.0", "readable-stream": "^1.0.27-1" } } diff --git a/test/close.js b/test/close.js index 6f319a75..d41fa621 100644 --- a/test/close.js +++ b/test/close.js @@ -4,8 +4,9 @@ var tester = require('stream-tester') var JSONStream = require('JSONStream') var QueryStream = require('../') +var helper = require('./helper') -require('./helper')('close', function(client) { +helper('close', function(client) { it('emits close', function(done) { var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2}) var query = client.query(stream) @@ -13,3 +14,22 @@ require('./helper')('close', function(client) { query.on('close', done) }) }) + +helper('early close', function(client) { + it('can be closed early', function(done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2}) + var query = client.query(stream) + var readCount = 0 + query.on('readable', function() { + readCount++ + query.read() + }) + query.once('readable', function() { + query.close() + }) + query.on('close', function() { + assert(readCount < 10, 'should not have read more than 10 rows') + done() + }) + }) +}) diff --git a/test/slow-reader.js b/test/slow-reader.js new file mode 100644 index 00000000..b9675a36 --- /dev/null +++ b/test/slow-reader.js @@ -0,0 +1,29 @@ +var assert = require('assert') +var helper = require('./helper') +var QueryStream = require('../') +var concat = require('concat-stream') + +var Transform = require('stream').Transform + +var mapper = new Transform({objectMode: true}) + +mapper._transform = function(obj, enc, cb) { + this.push(obj) + setTimeout(cb, 5) +} + +helper('slow reader', function(client) { + it('works', function(done) { + this.timeout(50000) + var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {highWaterMark: 100, batchSize: 50}) + stream.on('end', function() { + //console.log('stream end') + }) + var query = client.query(stream) + var result = [] + var count = 0 + stream.pipe(mapper).pipe(concat(function(res) { + done() + })) + }) +})