From 87b52f9e516ec0d5b70554886d46cb40ddec0069 Mon Sep 17 00:00:00 2001 From: Tom Buchok Date: Wed, 9 Apr 2014 23:56:21 -0400 Subject: [PATCH 1/2] adds failing test - appears that timestamp queries emit a lot of `rows` with length == 0 - `self.once('end')` is added each of these times - assertion on listener count shows that more than 10 listeners are applied --- test/stream-tester-timestamp.js | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 test/stream-tester-timestamp.js diff --git a/test/stream-tester-timestamp.js b/test/stream-tester-timestamp.js new file mode 100644 index 00000000..227630a7 --- /dev/null +++ b/test/stream-tester-timestamp.js @@ -0,0 +1,29 @@ +var pg = require('pg.js') +var QueryStream = require('../') +var spec = require('stream-spec') +var assert = require('assert') + +require('./helper')(function(client) { + it('should not warn about max listeners', function(done) { + var sql = 'SELECT * FROM generate_series(\'1983-12-30 00:00\'::timestamp, \'2013-12-30 00:00\', \'1 years\')' + var result = [] + var stream = new QueryStream(sql, []) + var ended = false + var query = client.query(stream) + query. + on('end', function() { ended = true }) + spec(query) + .readable() + .pausable({strict: true}) + .validateOnExit() + ; + var checkListeners = function() { + assert(stream.listeners('end').length < 10) + if (!ended) + setImmediate(checkListeners) + else + done() + } + checkListeners() + }) +}) \ No newline at end of file From cab956ba5030b146ea2132954588023b062ed02e Mon Sep 17 00:00:00 2001 From: Tom Buchok Date: Wed, 9 Apr 2014 23:58:18 -0400 Subject: [PATCH 2/2] passes `stream-tester-timestamp` - moves 'end' event listener to constructor, only listen once - ensures all existing tests still green --- index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index f50f0af9..89d3aa9b 100644 --- a/index.js +++ b/index.js @@ -11,6 +11,9 @@ var QueryStream = module.exports = function(text, values, options) { }) this.batchSize = options.batchSize || 100 this._ready = false + this.once('end', function() { + setImmediate(function() { this.emit('close') }.bind(this)); + }) //kick reader this.read() } @@ -35,7 +38,6 @@ QueryStream.prototype._read = function(n) { if(!rows.length) { setImmediate(function() { self.push(null) - self.once('end', self.emit.bind(self, 'close')) }) } self._reading = false