finish out the first rev of the improved pool api

This commit is contained in:
bmc 2013-02-19 19:34:28 -06:00
parent 971eb5d1ef
commit bb448fe61a
3 changed files with 157 additions and 19 deletions

View File

@ -1,10 +1,12 @@
var EventEmitter = require('events').EventEmitter;
var defaults = require(__dirname + '/defaults');
var genericPool = require('generic-pool');
//takes the same config structure as client
var createPool = function(config) {
config = config || {};
var name = JSON.stringify(config);
var createPool = function(clientConfig) {
clientConfig = clientConfig || {};
var name = JSON.stringify(clientConfig);
var pool = createPool.all[name];
if(pool) {
return pool;
@ -12,10 +14,22 @@ var createPool = function(config) {
pool = genericPool.Pool({
name: name,
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog,
create: function(cb) {
var client = new createPool.Client(config);
var client = new createPool.Client(clientConfig);
client.connect(function(err) {
return cb(err, client);
if(err) return cb(err, null);
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
pool.emit('error', e, client);
pool.destroy(client);
});
return cb(null, client);
});
},
destroy: function(client) {
@ -23,20 +37,23 @@ var createPool = function(config) {
}
});
createPool.all[name] = pool;
//mixin EventEmitter to pool
EventEmitter.call(pool);
for(var key in EventEmitter.prototype) {
if(EventEmitter.prototype.hasOwnProperty(key)) {
pool[key] = EventEmitter.prototype[key];
}
}
//monkey-patch with connect method
pool.connect = function(cb) {
pool.acquire(function(err, client) {
//TODO: on connection error should we remove this client automatically?
if(err) {
return cb(err);
}
if(cb.length > 2) {
return newConnect(pool, client, cb);
}
return oldConnect(pool, client, cb);
if(err) return cb(err, null, function() {/*NOOP*/});
//support both 2 (old) and 3 arguments
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
});
};
return pool;
}
};
//the old connect method of the pool
//would automatically subscribe to the 'drain'
@ -44,7 +61,7 @@ var createPool = function(config) {
//the pool once 'drain' fired once. This caused
//a bunch of problems, but for backwards compatibility
//we're leaving it in
var alarmDuration = 1000;
var alarmDuration = 5000;
var errorMessage = ['A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
'You might have a leak!',
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
@ -64,8 +81,12 @@ var oldConnect = function(pool, client, cb) {
};
var newConnect = function(pool, client, cb) {
cb(null, client, function() {
pool.release(client);
cb(null, client, function(err) {
if(err) {
pool.destroy(client);
} else {
pool.release(client);
}
});
};

View File

@ -19,7 +19,7 @@ FakeClient.prototype.connect = function(cb) {
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
//Hangs the event loop until 'end' is called on client
@ -59,7 +59,7 @@ test('pool creates pool on miss', function() {
assert.equal(Object.keys(pool.all).length, 2);
});
test('pool follows default limits', function() {
test('pool follows defaults', function() {
var p = pool(poolId++);
for(var i = 0; i < 100; i++) {
p.acquire(function(err, client) {
@ -101,3 +101,78 @@ test('pool#connect with 3 parameters', function() {
p.destroyAllNow();
});
});
test('on client error, client is removed from pool', function() {
var p = pool(poolId++);
p.connect(assert.success(function(client) {
assert.ok(client);
client.emit('drain');
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//error event fires on pool BEFORE pool.destroy is called with client
assert.emits(p, 'error', function(err) {
assert.equal(err.message, 'test error');
assert.ok(!client.endCalled);
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//after we're done in our callback, pool.destroy is called
process.nextTick(function() {
assert.ok(client.endCalled);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
p.destroyAllNow();
});
});
client.emit('error', new Error('test error'));
}));
});
test('pool with connection error on connection', function() {
pool.Client = function() {
return {
connect: function(cb) {
process.nextTick(function() {
cb(new Error('Could not connect'));
});
}
};
}
test('two parameters', function() {
var p = pool(poolId++);
p.connect(assert.calls(function(err, client) {
assert.ok(err);
assert.equal(client, null);
//client automatically removed
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}));
});
test('three parameters', function() {
var p = pool(poolId++);
var tid = setTimeout(function() {
assert.fail('Did not call connect callback');
}, 100);
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.ok(err);
assert.equal(client, null);
//done does nothing
done(new Error('OH NOOOO'));
done();
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});
});
test('returnning an error to done()', function() {
var p = pool(poolId++);
pool.Client = FakeClient;
p.connect(function(err, client, done) {
assert.equal(err, null);
assert(client);
done(new Error("BROKEN"));
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});

View File

@ -0,0 +1,42 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var pool = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
var FakeClient = function() {
EventEmitter.call(this);
}
util.inherits(FakeClient, EventEmitter);
FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
defaults.poolIdleTimeout = 10;
defaults.reapIntervalMillis = 10;
test('client times out from idle', function() {
pool.Client = FakeClient;
var p = pool(poolId++);
p.connect(function(err, client, done) {
done();
});
process.nextTick(function() {
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
setTimeout(function() {
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}, 50);
});
});