Merge pull request #274 from brianc/pool

Connection Pool refactor
This commit is contained in:
Brian C 2013-02-21 18:34:23 -08:00
commit 0f41e65619
7 changed files with 371 additions and 122 deletions

View File

@ -2,26 +2,26 @@ var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Client = require(__dirname+'/client');
var defaults = require(__dirname + '/defaults');
//external genericPool module
var genericPool = require('generic-pool');
//cache of existing client pools
var pools = {};
var pool = require(__dirname + '/pool');
var types = require(__dirname + '/types');
var Connection = require(__dirname + '/connection');
var PG = function(clientConstructor) {
EventEmitter.call(this);
this.Client = clientConstructor;
this.Connection = require(__dirname + '/connection');
this.Query = clientConstructor.Query;
this.defaults = defaults;
this.Client = pool.Client = clientConstructor;
this.Query = this.Client.Query;
this.pools = pool;
this.types = types;
this.Connection = Connection;
};
util.inherits(PG, EventEmitter);
PG.prototype.end = function() {
Object.keys(pools).forEach(function(name) {
var pool = pools[name];
var self = this;
Object.keys(self.pools.all).forEach(function(key) {
var pool = self.pools.all[key];
pool.drain(function() {
pool.destroyAllNow();
});
@ -29,51 +29,16 @@ PG.prototype.end = function() {
};
PG.prototype.connect = function(config, callback) {
var self = this;
var c = config;
var cb = callback;
//allow for no config to be passed
if(typeof c === 'function') {
cb = c;
c = defaults;
if(typeof config == "function") {
callback = config;
config = null;
}
var pool = this.pools.getOrCreate(config);
pool.connect(callback);
if(!pool.listeners('error').length) {
//propagate errors up to pg object
pool.on('error', this.emit.bind(this, 'error'));
}
//get unique pool name even if object was used as config
var poolName = typeof(c) === 'string' ? c : c.user+c.host+c.port+c.database;
var pool = pools[poolName];
if(pool) { return pool.acquire(cb); }
pool = pools[poolName] = genericPool.Pool({
name: poolName,
create: function(callback) {
var client = new self.Client(c);
client.connect(function(err) {
if(err) { return callback(err); }
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
self.emit('error', e, client);
pool.destroy(client);
});
callback(null, client);
});
client.on('drain', function() {
pool.release(client);
});
},
destroy: function(client) {
client.end();
},
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog
});
return pool.acquire(cb);
};
// cancel the query runned by the given client
@ -96,4 +61,3 @@ module.exports.__defineGetter__("native", function() {
return module.exports.native;
});
module.exports.types = require('./types');

107
lib/pool.js Normal file
View File

@ -0,0 +1,107 @@
var EventEmitter = require('events').EventEmitter;
var defaults = require(__dirname + '/defaults');
var genericPool = require('generic-pool');
var pools = {
//dictionary of all key:pool pairs
all: {},
//reference to the client constructor - can override in tests or for require('pg').native
Client: require(__dirname + '/client'),
getOrCreate: function(clientConfig) {
clientConfig = clientConfig || {};
var name = JSON.stringify(clientConfig);
var pool = pools.all[name];
if(pool) {
return pool;
}
pool = genericPool.Pool({
name: name,
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog,
create: function(cb) {
var client = new pools.Client(clientConfig);
client.connect(function(err) {
if(err) return cb(err, null);
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
pool.emit('error', e, client);
pool.destroy(client);
});
return cb(null, client);
});
},
destroy: function(client) {
client.end();
}
});
pools.all[name] = pool;
//mixin EventEmitter to pool
EventEmitter.call(pool);
for(var key in EventEmitter.prototype) {
if(EventEmitter.prototype.hasOwnProperty(key)) {
pool[key] = EventEmitter.prototype[key];
}
}
//monkey-patch with connect method
pool.connect = function(cb) {
pool.acquire(function(err, client) {
if(err) return cb(err, null, function() {/*NOOP*/});
//support both 2 (old) and 3 arguments
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
});
};
return pool;
}
};
//the old connect method of the pool
//would automatically subscribe to the 'drain'
//event and automatically return the client to
//the pool once 'drain' fired once. This caused
//a bunch of problems, but for backwards compatibility
//we're leaving it in
var alarmDuration = 5000;
var errorMessage = [
'A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
'You might have a leak!',
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
' //do something',
' done(); //call done() to signal you are finished with the client',
'}'
].join(require('os').EOL);
var oldConnect = function(pool, client, cb) {
var tid = setTimeout(function() {
console.error(errorMessage);
}, alarmDuration);
var onError = function() {
clearTimeout(tid);
client.removeListener('drain', release);
};
var release = function() {
clearTimeout(tid);
pool.release(client);
client.removeListener('error', onError);
};
client.once('drain', release);
client.once('error', onError);
cb(null, client);
};
var newConnect = function(pool, client, cb) {
cb(null, client, function(err) {
if(err) {
pool.destroy(client);
} else {
pool.release(client);
}
});
};
module.exports = pools;

View File

@ -10,5 +10,11 @@ helper.pg.defaults.poolSize = 1;
helper.pg.connect(assert.calls(function(err, client) {
assert.isNull(err);
client.end();
client.query('SELECT NOW()');
client.once('drain', function() {
setTimeout(function() {
helper.pg.end();
}, 10);
});
}));

View File

@ -9,7 +9,7 @@ helper.testPoolSize = function(max) {
for(var i = 0; i < max; i++) {
helper.pg.poolSize = 10;
test("connection #" + i + " executes", function() {
helper.pg.connect(helper.config, function(err, client) {
helper.pg.connect(helper.config, function(err, client, done) {
assert.isNull(err);
client.query("select * from person", function(err, result) {
assert.lengthIs(result.rows, 26)
@ -19,7 +19,8 @@ helper.testPoolSize = function(max) {
})
var query = client.query("SELECT * FROM NOW()")
query.on('end',function() {
sink.add()
sink.add();
done();
})
})
})

View File

@ -1,63 +0,0 @@
var helper = require(__dirname + '/test-helper');
helper.pg.defaults.poolSize = 1;
helper.pg.defaults.user = helper.args.user;
helper.pg.defaults.password = helper.args.password;
helper.pg.defaults.database = helper.args.database;
helper.pg.defaults.port = helper.args.port;
helper.pg.defaults.host = helper.args.host;
helper.pg.defaults.binary = helper.args.binary;
helper.pg.defaults.poolIdleTimeout = 100;
var moreArgs = {};
for (c in helper.config) {
moreArgs[c] = helper.config[c];
}
moreArgs.zomg = true;
var badArgs = {};
for (c in helper.config) {
badArgs[c] = helper.config[c];
}
badArgs.user = badArgs.user + 'laksdjfl';
badArgs.password = badArgs.password + 'asldkfjlas';
badArgs.zomg = true;
test('connecting with complete config', function() {
helper.pg.connect(helper.config, assert.calls(function(err, client) {
assert.isNull(err);
client.iGotAccessed = true;
client.query("SELECT NOW()")
}));
});
test('connecting with different config object', function() {
helper.pg.connect(moreArgs, assert.calls(function(err, client) {
assert.isNull(err);
assert.ok(client.iGotAccessed === true)
client.query("SELECT NOW()");
}))
});
test('connecting with all defaults', function() {
helper.pg.connect(assert.calls(function(err, client) {
assert.isNull(err);
assert.ok(client.iGotAccessed === true);
client.end();
}));
});
test('connecting with invalid config', function() {
helper.pg.connect(badArgs, assert.calls(function(err, client) {
assert.ok(err != null, "Expected connection error using invalid connection credentials");
}));
});

View File

@ -0,0 +1,192 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var pools = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
var FakeClient = function() {
EventEmitter.call(this);
}
util.inherits(FakeClient, EventEmitter);
FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
//Hangs the event loop until 'end' is called on client
var HangingClient = function(config) {
EventEmitter.call(this);
this.config = config;
}
util.inherits(HangingClient, EventEmitter);
HangingClient.prototype.connect = function(cb) {
this.intervalId = setInterval(function() {
console.log('hung client...');
}, 1000);
process.nextTick(cb);
}
HangingClient.prototype.end = function() {
clearInterval(this.intervalId);
}
pools.Client = FakeClient;
test('no pools exist', function() {
assert.empty(Object.keys(pools.all));
});
test('pool creates pool on miss', function() {
var p = pools.getOrCreate();
assert.ok(p);
assert.equal(Object.keys(pools.all).length, 1);
var p2 = pools.getOrCreate();
assert.equal(p, p2);
assert.equal(Object.keys(pools.all).length, 1);
var p3 = pools.getOrCreate("pg://postgres:password@localhost:5432/postgres");
assert.notEqual(p, p3);
assert.equal(Object.keys(pools.all).length, 2);
});
test('pool follows defaults', function() {
var p = pools.getOrCreate(poolId++);
for(var i = 0; i < 100; i++) {
p.acquire(function(err, client) {
});
}
assert.equal(p.getPoolSize(), defaults.poolSize);
});
test('pool#connect with 2 parameters (legacy, for backwards compat)', function() {
var p = pools.getOrCreate(poolId++);
p.connect(assert.success(function(client) {
assert.ok(client);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 1);
client.emit('drain');
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
p.destroyAllNow();
}));
});
test('pool#connect with 3 parameters', function() {
var p = pools.getOrCreate(poolId++);
var tid = setTimeout(function() {
throw new Error("Connection callback was never called");
}, 100);
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.equal(err, null);
assert.ok(client);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 1);
client.emit('drain');
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 1);
done();
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
p.destroyAllNow();
});
});
test('on client error, client is removed from pool', function() {
var p = pools.getOrCreate(poolId++);
p.connect(assert.success(function(client) {
assert.ok(client);
client.emit('drain');
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//error event fires on pool BEFORE pool.destroy is called with client
assert.emits(p, 'error', function(err) {
assert.equal(err.message, 'test error');
assert.ok(!client.endCalled);
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//after we're done in our callback, pool.destroy is called
process.nextTick(function() {
assert.ok(client.endCalled);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
p.destroyAllNow();
});
});
client.emit('error', new Error('test error'));
}));
});
test('pool with connection error on connection', function() {
pools.Client = function() {
return {
connect: function(cb) {
process.nextTick(function() {
cb(new Error('Could not connect'));
});
}
};
}
test('two parameters', function() {
var p = pools.getOrCreate(poolId++);
p.connect(assert.calls(function(err, client) {
assert.ok(err);
assert.equal(client, null);
//client automatically removed
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}));
});
test('three parameters', function() {
var p = pools.getOrCreate(poolId++);
var tid = setTimeout(function() {
assert.fail('Did not call connect callback');
}, 100);
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.ok(err);
assert.equal(client, null);
//done does nothing
done(new Error('OH NOOOO'));
done();
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});
});
test('returnning an error to done()', function() {
var p = pools.getOrCreate(poolId++);
pools.Client = FakeClient;
p.connect(function(err, client, done) {
assert.equal(err, null);
assert(client);
done(new Error("BROKEN"));
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});
test('fetching pool by object', function() {
var p = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
var p2 = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
assert.equal(p, p2);
});

View File

@ -0,0 +1,42 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var pools = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
var FakeClient = function() {
EventEmitter.call(this);
}
util.inherits(FakeClient, EventEmitter);
FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
defaults.poolIdleTimeout = 10;
defaults.reapIntervalMillis = 10;
test('client times out from idle', function() {
pools.Client = FakeClient;
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done) {
done();
});
process.nextTick(function() {
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
setTimeout(function() {
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}, 50);
});
});