From 68819dffdac44d7248477ff4e6961a244fce2325 Mon Sep 17 00:00:00 2001 From: "matthew.blasius" Date: Thu, 5 Nov 2015 14:09:50 -0500 Subject: [PATCH] Avoid race when stream closed while fetching --- index.js | 3 +++ test/close.js | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/index.js b/index.js index a5fc2da3..4be25f1b 100644 --- a/index.js +++ b/index.js @@ -51,6 +51,9 @@ QueryStream.prototype._read = function(n) { if(err) { return self.emit('error', err) } + + if (self._closing) { return; } + if(!rows.length) { process.nextTick(function() { self.push(null) diff --git a/test/close.js b/test/close.js index d41fa621..35af30b5 100644 --- a/test/close.js +++ b/test/close.js @@ -33,3 +33,50 @@ helper('early close', function(client) { }) }) }) + + +helper('should not throw errors after early close', function(client) { + it('can be closed early without error', function(done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, 2000) num'); + var query = client.query(stream); + var fetchCount = 0; + var errorCount = 0; + + + function waitForErrors() { + + setTimeout(function () { + assert(errorCount === 0, 'should not throw a ton of errors'); + done(); + }, 10); + } + + // hack internal _fetch function to force query.close immediately after _fetch is called (simulating the race condition) + // race condition: if close is called immediately after _fetch is called, but before results are returned, errors are thrown + // when the fetch results are pushed to the readable stream after its already closed. + query._fetch = (function (_fetch) { + return function () { + + // wait for the second fetch. closing immediately after the first fetch throws an entirely different error :( + if (fetchCount++ === 0) { + return _fetch.apply(this, arguments); + } + + var results = _fetch.apply(this, arguments); + + query.close(); + waitForErrors(); + + query._fetch = _fetch; // we're done with our hack, so restore the original _fetch function. + + return results; + } + }(query._fetch)); + + query.on('error', function () { errorCount++; }); + + query.on('readable', function () { + query.read(); + }); + }); +});