From ec1c70c4b5b6ad84ba40b57cfd94dff3cfa5485d Mon Sep 17 00:00:00 2001 From: brianc Date: Mon, 29 Aug 2011 23:43:36 -0500 Subject: [PATCH] ability to pause/resume drain event for long-running async transactions --- lib/client.js | 15 ++- lib/native/index.js | 13 ++- test/integration/client/drain-tests.js | 55 ++++++++++ test/unit/client/query-queue-tests.js | 138 ++++++++++++++++++------- 4 files changed, 180 insertions(+), 41 deletions(-) create mode 100644 test/integration/client/drain-tests.js diff --git a/lib/client.js b/lib/client.js index 979c8691..852b3087 100644 --- a/lib/client.js +++ b/lib/client.js @@ -129,7 +129,7 @@ p._pulseQueryQueue = function() { this.activeQuery.submit(this.connection); } else if(this.hasExecuted) { this.activeQuery = null; - this.emit('drain') + this._drainPaused > 0 ? this._drainPaused++ : this.emit('drain') } } }; @@ -154,6 +154,19 @@ p.query = function(config, values, callback) { return query; }; +//prevents client from otherwise emitting 'drain' event until 'resumeDrain' is called +p.pauseDrain = function() { + this._drainPaused = 1; +}; + +//resume raising 'drain' event +p.resumeDrain = function() { + if(this._drainPaused > 1) { + this.emit('drain'); + } + this._drainPaused = 0; +}; + p.end = function() { this.connection.end(); }; diff --git a/lib/native/index.js b/lib/native/index.js index 22027fbc..37b2befd 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -37,7 +37,7 @@ p._pulseQueryQueue = function(initialConnection) { var query = this._queryQueue.shift(); if(!query) { if(!initialConnection) { - this.emit('drain'); + this._drainPaused ? this._drainPaused++ : this.emit('drain'); } return; } @@ -60,6 +60,17 @@ p._pulseQueryQueue = function(initialConnection) { } } +p.pauseDrain = function() { + this._drainPaused = 1; +}; + +p.resumeDrain = function() { + if(this._drainPaused > 1) { + this.emit('drain') + }; + this._drainPaused = 0; +}; + var clientBuilder = function(config) { config = config || {}; var connection = new Connection(); diff --git a/test/integration/client/drain-tests.js b/test/integration/client/drain-tests.js new file mode 100644 index 00000000..0aff28eb --- /dev/null +++ b/test/integration/client/drain-tests.js @@ -0,0 +1,55 @@ +var helper = require(__dirname + '/test-helper'); +var pg = require(__dirname + '/../../../lib'); + +if(helper.args.native) { + pg = require(__dirname + '/../../../lib').native; +} + +var testDrainOfClientWithPendingQueries = function() { + pg.connect(helper.connectionString(), assert.success(function(client) { + test('when there are pending queries and client is resumed', function() { + var drainCount = 0; + client.on('drain', function() { + drainCount++; + }); + client.pauseDrain(); + client.query('SELECT NOW()', function() { + client.query('SELECT NOW()', function() { + assert.equal(drainCount, 0); + process.nextTick(function() { + assert.equal(drainCount, 1); + pg.end(); + }); + }); + client.resumeDrain(); + assert.equal(drainCount, 0); + }); + }); + })); +}; + +pg.connect(helper.connectionString(), assert.success(function(client) { + var drainCount = 0; + client.on('drain', function() { + drainCount++; + }); + test('pauseDrain and resumeDrain on simple client', function() { + client.pauseDrain(); + client.resumeDrain(); + process.nextTick(assert.calls(function() { + assert.equal(drainCount, 0); + test('drain is paused', function() { + client.pauseDrain(); + client.query('SELECT NOW()', assert.success(function() { + process.nextTick(function() { + assert.equal(drainCount, 0); + client.resumeDrain(); + assert.equal(drainCount, 1); + testDrainOfClientWithPendingQueries(); + }); + })); + }); + })); + }); +})); + diff --git a/test/unit/client/query-queue-tests.js b/test/unit/client/query-queue-tests.js index af95ae4d..cd87cfe9 100644 --- a/test/unit/client/query-queue-tests.js +++ b/test/unit/client/query-queue-tests.js @@ -1,52 +1,112 @@ var helper = require(__dirname + '/test-helper'); var Connection = require(__dirname + '/../../../lib/connection'); -var con = new Connection({stream: "NO"}); -var client = new Client({connection:con}); +test('drain', function() { + var con = new Connection({stream: "NO"}); + var client = new Client({connection:con}); + con.connect = function() { + con.emit('connect'); + }; + con.query = function() { + }; + client.connect(); -con.connect = function() { - con.emit('connect'); -}; -con.query = function() { -}; -client.connect(); + var raisedDrain = false; + client.on('drain', function() { + raisedDrain = true; + }); -var raisedDrain = false; -client.on('drain', function() { - raisedDrain = true; -}); + client.query("hello"); + client.query("sup"); + client.query('boom'); -client.query("hello"); -client.query("sup"); -client.query('boom'); + test("with pending queries", function() { + test("does not emit drain", function() { + assert.equal(raisedDrain, false); + }); + }); -test("with pending queries", function() { - test("does not emit drain", function() { - assert.equal(raisedDrain, false); + test("after some queries executed", function() { + con.emit('readyForQuery'); + test("does not emit drain", function() { + assert.equal(raisedDrain, false); + }); + }); + + test("when all queries are sent", function() { + con.emit('readyForQuery'); + con.emit('readyForQuery'); + test("does not emit drain", function() { + assert.equal(raisedDrain, false); + }); + }); + + test("after last query finishes", function() { + con.emit('readyForQuery'); + test("emits drain", function() { + process.nextTick(function() { + assert.ok(raisedDrain); + }) + }); }); }); -test("after some queries executed", function() { - con.emit('readyForQuery'); - test("does not emit drain", function() { - assert.equal(raisedDrain, false); - }); -}); +test('with drain paused', function() { + //mock out a fake connection + var con = new Connection({stream: "NO"}); + con.connect = function() { + con.emit('connect'); + }; + con.query = function() { + }; -test("when all queries are sent", function() { - con.emit('readyForQuery'); - con.emit('readyForQuery'); - test("does not emit drain", function() { - assert.equal(raisedDrain, false); - }); -}); + var client = new Client({connection:con}); -test("after last query finishes", function() { - con.emit('readyForQuery'); - test("emits drain", function() { - process.nextTick(function() { - assert.ok(raisedDrain); - }) - }); -}); + client.connect(); + var drainCount = 0; + client.on('drain', function() { + drainCount++; + }); + + test('normally unpaused', function() { + con.emit('readyForQuery'); + client.query('boom'); + assert.emits(client, 'drain', function() { + assert.equal(drainCount, 1); + }); + con.emit('readyForQuery'); + }); + + test('pausing', function() { + test('unpaused with no queries in between', function() { + client.pauseDrain(); + client.resumeDrain(); + assert.equal(drainCount, 1); + }); + + test('paused', function() { + test('resumeDrain after empty', function() { + client.pauseDrain(); + client.query('asdf'); + con.emit('readyForQuery'); + assert.equal(drainCount, 1); + client.resumeDrain(); + assert.equal(drainCount, 2); + }); + + test('resumDrain while still pending', function() { + client.pauseDrain(); + client.query('asdf'); + client.query('asdf1'); + con.emit('readyForQuery'); + client.resumeDrain(); + assert.equal(drainCount, 2); + con.emit('readyForQuery'); + assert.equal(drainCount, 3); + }); + + }); + }); + +});