From bb448fe61a8cacdfc99a87cb40427a95acf8fb28 Mon Sep 17 00:00:00 2001 From: bmc Date: Tue, 19 Feb 2013 19:34:28 -0600 Subject: [PATCH] finish out the first rev of the improved pool api --- lib/pool.js | 55 ++++++++++++++++------- test/unit/pool/basic-tests.js | 79 ++++++++++++++++++++++++++++++++- test/unit/pool/timeout-tests.js | 42 ++++++++++++++++++ 3 files changed, 157 insertions(+), 19 deletions(-) create mode 100644 test/unit/pool/timeout-tests.js diff --git a/lib/pool.js b/lib/pool.js index 60e01c19..c044592c 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,10 +1,12 @@ +var EventEmitter = require('events').EventEmitter; + var defaults = require(__dirname + '/defaults'); var genericPool = require('generic-pool'); //takes the same config structure as client -var createPool = function(config) { - config = config || {}; - var name = JSON.stringify(config); +var createPool = function(clientConfig) { + clientConfig = clientConfig || {}; + var name = JSON.stringify(clientConfig); var pool = createPool.all[name]; if(pool) { return pool; @@ -12,10 +14,22 @@ var createPool = function(config) { pool = genericPool.Pool({ name: name, max: defaults.poolSize, + idleTimeoutMillis: defaults.poolIdleTimeout, + reapIntervalMillis: defaults.reapIntervalMillis, + log: defaults.poolLog, create: function(cb) { - var client = new createPool.Client(config); + var client = new createPool.Client(clientConfig); client.connect(function(err) { - return cb(err, client); + if(err) return cb(err, null); + + //handle connected client background errors by emitting event + //via the pg object and then removing errored client from the pool + client.on('error', function(e) { + pool.emit('error', e, client); + pool.destroy(client); + }); + + return cb(null, client); }); }, destroy: function(client) { @@ -23,20 +37,23 @@ var createPool = function(config) { } }); createPool.all[name] = pool; + //mixin EventEmitter to pool + EventEmitter.call(pool); + for(var key in EventEmitter.prototype) { + if(EventEmitter.prototype.hasOwnProperty(key)) { + pool[key] = EventEmitter.prototype[key]; + } + } + //monkey-patch with connect method pool.connect = function(cb) { pool.acquire(function(err, client) { - //TODO: on connection error should we remove this client automatically? - if(err) { - return cb(err); - } - if(cb.length > 2) { - return newConnect(pool, client, cb); - } - return oldConnect(pool, client, cb); + if(err) return cb(err, null, function() {/*NOOP*/}); + //support both 2 (old) and 3 arguments + (cb.length > 2 ? newConnect : oldConnect)(pool, client, cb); }); }; return pool; -} +}; //the old connect method of the pool //would automatically subscribe to the 'drain' @@ -44,7 +61,7 @@ var createPool = function(config) { //the pool once 'drain' fired once. This caused //a bunch of problems, but for backwards compatibility //we're leaving it in -var alarmDuration = 1000; +var alarmDuration = 5000; var errorMessage = ['A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.', 'You might have a leak!', 'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {', @@ -64,8 +81,12 @@ var oldConnect = function(pool, client, cb) { }; var newConnect = function(pool, client, cb) { - cb(null, client, function() { - pool.release(client); + cb(null, client, function(err) { + if(err) { + pool.destroy(client); + } else { + pool.release(client); + } }); }; diff --git a/test/unit/pool/basic-tests.js b/test/unit/pool/basic-tests.js index ac1071bd..a19b9dd6 100644 --- a/test/unit/pool/basic-tests.js +++ b/test/unit/pool/basic-tests.js @@ -19,7 +19,7 @@ FakeClient.prototype.connect = function(cb) { } FakeClient.prototype.end = function() { - + this.endCalled = true; } //Hangs the event loop until 'end' is called on client @@ -59,7 +59,7 @@ test('pool creates pool on miss', function() { assert.equal(Object.keys(pool.all).length, 2); }); -test('pool follows default limits', function() { +test('pool follows defaults', function() { var p = pool(poolId++); for(var i = 0; i < 100; i++) { p.acquire(function(err, client) { @@ -101,3 +101,78 @@ test('pool#connect with 3 parameters', function() { p.destroyAllNow(); }); }); + +test('on client error, client is removed from pool', function() { + var p = pool(poolId++); + p.connect(assert.success(function(client) { + assert.ok(client); + client.emit('drain'); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + //error event fires on pool BEFORE pool.destroy is called with client + assert.emits(p, 'error', function(err) { + assert.equal(err.message, 'test error'); + assert.ok(!client.endCalled); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + //after we're done in our callback, pool.destroy is called + process.nextTick(function() { + assert.ok(client.endCalled); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + p.destroyAllNow(); + }); + }); + client.emit('error', new Error('test error')); + })); +}); + +test('pool with connection error on connection', function() { + pool.Client = function() { + return { + connect: function(cb) { + process.nextTick(function() { + cb(new Error('Could not connect')); + }); + } + }; + } + test('two parameters', function() { + var p = pool(poolId++); + p.connect(assert.calls(function(err, client) { + assert.ok(err); + assert.equal(client, null); + //client automatically removed + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + })); + }); + test('three parameters', function() { + var p = pool(poolId++); + var tid = setTimeout(function() { + assert.fail('Did not call connect callback'); + }, 100); + p.connect(function(err, client, done) { + clearTimeout(tid); + assert.ok(err); + assert.equal(client, null); + //done does nothing + done(new Error('OH NOOOO')); + done(); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }); + }); +}); + +test('returnning an error to done()', function() { + var p = pool(poolId++); + pool.Client = FakeClient; + p.connect(function(err, client, done) { + assert.equal(err, null); + assert(client); + done(new Error("BROKEN")); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }); +}); diff --git a/test/unit/pool/timeout-tests.js b/test/unit/pool/timeout-tests.js new file mode 100644 index 00000000..a79e1d3a --- /dev/null +++ b/test/unit/pool/timeout-tests.js @@ -0,0 +1,42 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var libDir = __dirname + '/../../../lib'; +var defaults = require(libDir + '/defaults'); +var pool = require(libDir + '/pool'); +var poolId = 0; + +require(__dirname + '/../../test-helper'); + +var FakeClient = function() { + EventEmitter.call(this); +} + +util.inherits(FakeClient, EventEmitter); + +FakeClient.prototype.connect = function(cb) { + process.nextTick(cb); +} + +FakeClient.prototype.end = function() { + this.endCalled = true; +} + +defaults.poolIdleTimeout = 10; +defaults.reapIntervalMillis = 10; + +test('client times out from idle', function() { + pool.Client = FakeClient; + var p = pool(poolId++); + p.connect(function(err, client, done) { + done(); + }); + process.nextTick(function() { + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + setTimeout(function() { + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }, 50); + }); +});