diff --git a/lib/index.js b/lib/index.js index f05ec152..6dab1339 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,26 +2,26 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); var Client = require(__dirname+'/client'); var defaults = require(__dirname + '/defaults'); - -//external genericPool module -var genericPool = require('generic-pool'); - -//cache of existing client pools -var pools = {}; +var pool = require(__dirname + '/pool'); +var types = require(__dirname + '/types'); +var Connection = require(__dirname + '/connection'); var PG = function(clientConstructor) { EventEmitter.call(this); - this.Client = clientConstructor; - this.Connection = require(__dirname + '/connection'); - this.Query = clientConstructor.Query; this.defaults = defaults; + this.Client = pool.Client = clientConstructor; + this.Query = this.Client.Query; + this.pools = pool; + this.types = types; + this.Connection = Connection; }; util.inherits(PG, EventEmitter); PG.prototype.end = function() { - Object.keys(pools).forEach(function(name) { - var pool = pools[name]; + var self = this; + Object.keys(self.pools.all).forEach(function(key) { + var pool = self.pools.all[key]; pool.drain(function() { pool.destroyAllNow(); }); @@ -29,51 +29,16 @@ PG.prototype.end = function() { }; PG.prototype.connect = function(config, callback) { - var self = this; - var c = config; - var cb = callback; - //allow for no config to be passed - if(typeof c === 'function') { - cb = c; - c = defaults; + if(typeof config == "function") { + callback = config; + config = null; + } + var pool = this.pools.getOrCreate(config); + pool.connect(callback); + if(!pool.listeners('error').length) { + //propagate errors up to pg object + pool.on('error', this.emit.bind(this, 'error')); } - - //get unique pool name even if object was used as config - var poolName = typeof(c) === 'string' ? c : c.user+c.host+c.port+c.database; - var pool = pools[poolName]; - - if(pool) { return pool.acquire(cb); } - - pool = pools[poolName] = genericPool.Pool({ - name: poolName, - create: function(callback) { - var client = new self.Client(c); - client.connect(function(err) { - if(err) { return callback(err); } - - //handle connected client background errors by emitting event - //via the pg object and then removing errored client from the pool - client.on('error', function(e) { - self.emit('error', e, client); - pool.destroy(client); - }); - - callback(null, client); - }); - - client.on('drain', function() { - pool.release(client); - }); - }, - destroy: function(client) { - client.end(); - }, - max: defaults.poolSize, - idleTimeoutMillis: defaults.poolIdleTimeout, - reapIntervalMillis: defaults.reapIntervalMillis, - log: defaults.poolLog - }); - return pool.acquire(cb); }; // cancel the query runned by the given client @@ -96,4 +61,3 @@ module.exports.__defineGetter__("native", function() { return module.exports.native; }); -module.exports.types = require('./types'); diff --git a/lib/pool.js b/lib/pool.js new file mode 100644 index 00000000..15cf77ed --- /dev/null +++ b/lib/pool.js @@ -0,0 +1,107 @@ +var EventEmitter = require('events').EventEmitter; + +var defaults = require(__dirname + '/defaults'); +var genericPool = require('generic-pool'); + +var pools = { + //dictionary of all key:pool pairs + all: {}, + //reference to the client constructor - can override in tests or for require('pg').native + Client: require(__dirname + '/client'), + getOrCreate: function(clientConfig) { + clientConfig = clientConfig || {}; + var name = JSON.stringify(clientConfig); + var pool = pools.all[name]; + if(pool) { + return pool; + } + pool = genericPool.Pool({ + name: name, + max: defaults.poolSize, + idleTimeoutMillis: defaults.poolIdleTimeout, + reapIntervalMillis: defaults.reapIntervalMillis, + log: defaults.poolLog, + create: function(cb) { + var client = new pools.Client(clientConfig); + client.connect(function(err) { + if(err) return cb(err, null); + + //handle connected client background errors by emitting event + //via the pg object and then removing errored client from the pool + client.on('error', function(e) { + pool.emit('error', e, client); + pool.destroy(client); + }); + + return cb(null, client); + }); + }, + destroy: function(client) { + client.end(); + } + }); + pools.all[name] = pool; + //mixin EventEmitter to pool + EventEmitter.call(pool); + for(var key in EventEmitter.prototype) { + if(EventEmitter.prototype.hasOwnProperty(key)) { + pool[key] = EventEmitter.prototype[key]; + } + } + //monkey-patch with connect method + pool.connect = function(cb) { + pool.acquire(function(err, client) { + if(err) return cb(err, null, function() {/*NOOP*/}); + //support both 2 (old) and 3 arguments + (cb.length > 2 ? newConnect : oldConnect)(pool, client, cb); + }); + }; + return pool; + } +}; + +//the old connect method of the pool +//would automatically subscribe to the 'drain' +//event and automatically return the client to +//the pool once 'drain' fired once. This caused +//a bunch of problems, but for backwards compatibility +//we're leaving it in +var alarmDuration = 5000; +var errorMessage = [ + 'A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.', + 'You might have a leak!', + 'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {', + ' //do something', + ' done(); //call done() to signal you are finished with the client', + '}' +].join(require('os').EOL); + +var oldConnect = function(pool, client, cb) { + var tid = setTimeout(function() { + console.error(errorMessage); + }, alarmDuration); + var onError = function() { + clearTimeout(tid); + client.removeListener('drain', release); + }; + var release = function() { + clearTimeout(tid); + pool.release(client); + client.removeListener('error', onError); + }; + client.once('drain', release); + client.once('error', onError); + cb(null, client); +}; + +var newConnect = function(pool, client, cb) { + cb(null, client, function(err) { + if(err) { + pool.destroy(client); + } else { + pool.release(client); + } + }); +}; + +module.exports = pools; diff --git a/test/integration/connection-pool/optional-config-tests.js b/test/integration/connection-pool/optional-config-tests.js index d3ddc509..690be7f2 100644 --- a/test/integration/connection-pool/optional-config-tests.js +++ b/test/integration/connection-pool/optional-config-tests.js @@ -10,5 +10,11 @@ helper.pg.defaults.poolSize = 1; helper.pg.connect(assert.calls(function(err, client) { assert.isNull(err); - client.end(); + client.query('SELECT NOW()'); + client.once('drain', function() { + setTimeout(function() { + helper.pg.end(); + + }, 10); + }); })); diff --git a/test/integration/connection-pool/test-helper.js b/test/integration/connection-pool/test-helper.js index cc86677d..199407cd 100644 --- a/test/integration/connection-pool/test-helper.js +++ b/test/integration/connection-pool/test-helper.js @@ -9,7 +9,7 @@ helper.testPoolSize = function(max) { for(var i = 0; i < max; i++) { helper.pg.poolSize = 10; test("connection #" + i + " executes", function() { - helper.pg.connect(helper.config, function(err, client) { + helper.pg.connect(helper.config, function(err, client, done) { assert.isNull(err); client.query("select * from person", function(err, result) { assert.lengthIs(result.rows, 26) @@ -19,7 +19,8 @@ helper.testPoolSize = function(max) { }) var query = client.query("SELECT * FROM NOW()") query.on('end',function() { - sink.add() + sink.add(); + done(); }) }) }) diff --git a/test/integration/connection-pool/unique-name-tests.js b/test/integration/connection-pool/unique-name-tests.js deleted file mode 100644 index a92a0041..00000000 --- a/test/integration/connection-pool/unique-name-tests.js +++ /dev/null @@ -1,63 +0,0 @@ -var helper = require(__dirname + '/test-helper'); - -helper.pg.defaults.poolSize = 1; -helper.pg.defaults.user = helper.args.user; -helper.pg.defaults.password = helper.args.password; -helper.pg.defaults.database = helper.args.database; -helper.pg.defaults.port = helper.args.port; -helper.pg.defaults.host = helper.args.host; -helper.pg.defaults.binary = helper.args.binary; -helper.pg.defaults.poolIdleTimeout = 100; - -var moreArgs = {}; -for (c in helper.config) { - moreArgs[c] = helper.config[c]; -} -moreArgs.zomg = true; - -var badArgs = {}; -for (c in helper.config) { - badArgs[c] = helper.config[c]; -} - -badArgs.user = badArgs.user + 'laksdjfl'; -badArgs.password = badArgs.password + 'asldkfjlas'; -badArgs.zomg = true; - -test('connecting with complete config', function() { - - helper.pg.connect(helper.config, assert.calls(function(err, client) { - assert.isNull(err); - client.iGotAccessed = true; - client.query("SELECT NOW()") - })); - -}); - -test('connecting with different config object', function() { - - helper.pg.connect(moreArgs, assert.calls(function(err, client) { - assert.isNull(err); - assert.ok(client.iGotAccessed === true) - client.query("SELECT NOW()"); - })) - -}); - -test('connecting with all defaults', function() { - - helper.pg.connect(assert.calls(function(err, client) { - assert.isNull(err); - assert.ok(client.iGotAccessed === true); - client.end(); - })); - -}); - -test('connecting with invalid config', function() { - - helper.pg.connect(badArgs, assert.calls(function(err, client) { - assert.ok(err != null, "Expected connection error using invalid connection credentials"); - })); - -}); diff --git a/test/unit/pool/basic-tests.js b/test/unit/pool/basic-tests.js new file mode 100644 index 00000000..b96937ee --- /dev/null +++ b/test/unit/pool/basic-tests.js @@ -0,0 +1,192 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var libDir = __dirname + '/../../../lib'; +var defaults = require(libDir + '/defaults'); +var pools = require(libDir + '/pool'); +var poolId = 0; + +require(__dirname + '/../../test-helper'); + +var FakeClient = function() { + EventEmitter.call(this); +} + +util.inherits(FakeClient, EventEmitter); + +FakeClient.prototype.connect = function(cb) { + process.nextTick(cb); +} + +FakeClient.prototype.end = function() { + this.endCalled = true; +} + +//Hangs the event loop until 'end' is called on client +var HangingClient = function(config) { + EventEmitter.call(this); + this.config = config; +} + +util.inherits(HangingClient, EventEmitter); + +HangingClient.prototype.connect = function(cb) { + this.intervalId = setInterval(function() { + console.log('hung client...'); + }, 1000); + process.nextTick(cb); +} + +HangingClient.prototype.end = function() { + clearInterval(this.intervalId); +} + +pools.Client = FakeClient; + +test('no pools exist', function() { + assert.empty(Object.keys(pools.all)); +}); + +test('pool creates pool on miss', function() { + var p = pools.getOrCreate(); + assert.ok(p); + assert.equal(Object.keys(pools.all).length, 1); + var p2 = pools.getOrCreate(); + assert.equal(p, p2); + assert.equal(Object.keys(pools.all).length, 1); + var p3 = pools.getOrCreate("pg://postgres:password@localhost:5432/postgres"); + assert.notEqual(p, p3); + assert.equal(Object.keys(pools.all).length, 2); +}); + +test('pool follows defaults', function() { + var p = pools.getOrCreate(poolId++); + for(var i = 0; i < 100; i++) { + p.acquire(function(err, client) { + }); + } + assert.equal(p.getPoolSize(), defaults.poolSize); +}); + +test('pool#connect with 2 parameters (legacy, for backwards compat)', function() { + var p = pools.getOrCreate(poolId++); + p.connect(assert.success(function(client) { + assert.ok(client); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 1); + client.emit('drain'); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + p.destroyAllNow(); + })); +}); + +test('pool#connect with 3 parameters', function() { + var p = pools.getOrCreate(poolId++); + var tid = setTimeout(function() { + throw new Error("Connection callback was never called"); + }, 100); + p.connect(function(err, client, done) { + clearTimeout(tid); + assert.equal(err, null); + assert.ok(client); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 1); + client.emit('drain'); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 1); + done(); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + p.destroyAllNow(); + }); +}); + +test('on client error, client is removed from pool', function() { + var p = pools.getOrCreate(poolId++); + p.connect(assert.success(function(client) { + assert.ok(client); + client.emit('drain'); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + //error event fires on pool BEFORE pool.destroy is called with client + assert.emits(p, 'error', function(err) { + assert.equal(err.message, 'test error'); + assert.ok(!client.endCalled); + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + //after we're done in our callback, pool.destroy is called + process.nextTick(function() { + assert.ok(client.endCalled); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + p.destroyAllNow(); + }); + }); + client.emit('error', new Error('test error')); + })); +}); + +test('pool with connection error on connection', function() { + pools.Client = function() { + return { + connect: function(cb) { + process.nextTick(function() { + cb(new Error('Could not connect')); + }); + } + }; + } + test('two parameters', function() { + var p = pools.getOrCreate(poolId++); + p.connect(assert.calls(function(err, client) { + assert.ok(err); + assert.equal(client, null); + //client automatically removed + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + })); + }); + test('three parameters', function() { + var p = pools.getOrCreate(poolId++); + var tid = setTimeout(function() { + assert.fail('Did not call connect callback'); + }, 100); + p.connect(function(err, client, done) { + clearTimeout(tid); + assert.ok(err); + assert.equal(client, null); + //done does nothing + done(new Error('OH NOOOO')); + done(); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }); + }); +}); + +test('returnning an error to done()', function() { + var p = pools.getOrCreate(poolId++); + pools.Client = FakeClient; + p.connect(function(err, client, done) { + assert.equal(err, null); + assert(client); + done(new Error("BROKEN")); + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }); +}); + +test('fetching pool by object', function() { + var p = pools.getOrCreate({ + user: 'brian', + host: 'localhost', + password: 'password' + }); + var p2 = pools.getOrCreate({ + user: 'brian', + host: 'localhost', + password: 'password' + }); + assert.equal(p, p2); +}); diff --git a/test/unit/pool/timeout-tests.js b/test/unit/pool/timeout-tests.js new file mode 100644 index 00000000..0fc96b2d --- /dev/null +++ b/test/unit/pool/timeout-tests.js @@ -0,0 +1,42 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var libDir = __dirname + '/../../../lib'; +var defaults = require(libDir + '/defaults'); +var pools = require(libDir + '/pool'); +var poolId = 0; + +require(__dirname + '/../../test-helper'); + +var FakeClient = function() { + EventEmitter.call(this); +} + +util.inherits(FakeClient, EventEmitter); + +FakeClient.prototype.connect = function(cb) { + process.nextTick(cb); +} + +FakeClient.prototype.end = function() { + this.endCalled = true; +} + +defaults.poolIdleTimeout = 10; +defaults.reapIntervalMillis = 10; + +test('client times out from idle', function() { + pools.Client = FakeClient; + var p = pools.getOrCreate(poolId++); + p.connect(function(err, client, done) { + done(); + }); + process.nextTick(function() { + assert.equal(p.availableObjectsCount(), 1); + assert.equal(p.getPoolSize(), 1); + setTimeout(function() { + assert.equal(p.availableObjectsCount(), 0); + assert.equal(p.getPoolSize(), 0); + }, 50); + }); +});