remove custom pool code

This commit is contained in:
brianc 2011-08-11 21:30:10 -05:00
parent 4cb97a2b9e
commit aa63f50437
4 changed files with 1 additions and 262 deletions

View File

@ -1,100 +0,0 @@
var Pool = require(__dirname + '/utils').Pool;
var defaults = require(__dirname + '/defaults');
module.exports = {
init: function(Client) {
//connection pool global cache
var clientPools = {
}
var connect = function(config, callback) {
var poolName = config;
if(typeof config === 'function') {
callback = config;
config = defaults;
}
//get unique pool name
if(typeof config === 'object') {
poolName = config.user + config.database + config.host + config.port;
}
var pool = clientPools[poolName];
//create pool if doesn't exist
if(!pool) {
pool = clientPools[poolName] = new Pool(defaults.poolSize, function() {
var client = new Client(config);
client.connected = false;
return client;
})
}
pool.checkOut(function(err, client) {
//if client already connected just
//pass it along to the callback and return
if(client.connected) {
callback(null, client);
return;
}
var onError = function(error) {
client.removeListener('connect', onReady);
callback(error);
pool.checkIn(client);
}
var onReady = function() {
client.removeListener('error', onError);
client.connected = true;
callback(null, client);
client.on('drain', function() {
pool.checkIn(client);
});
}
client.once('error', onError);
client.once('connect', onReady);
client.connect();
});
}
//destroys the world
//or optionally only a single pool
//mostly used for testing or
//a hard shutdown
var end = function(name) {
if(!name) {
for(var poolName in clientPools) {
end(poolName)
return
}
}
var pool = clientPools[name];
//log("destroying pool %s", name);
pool.waits.forEach(function(wait) {
wait(new Error("Client is being destroyed"))
})
pool.waits = [];
pool.items.forEach(function(item) {
var client = item.ref;
if(client.activeQuery) {
//log("client is still active, waiting for it to complete");
client.on('drain', client.end.bind(client))
} else {
client.end();
}
})
//remove reference to pool lookup
clientPools[name] = null;
delete(clientPools[name])
};
//export functions with closures to client constructor
return {
connect: connect,
end: end
}
}
};

View File

@ -176,6 +176,4 @@ p.handleReadyForQuery = function() {
this.emit('end');
};
var pool = require(__dirname + '/client-pool').init(ctor);
module.exports = ctor;

View File

@ -3,6 +3,7 @@ var defaults = require(__dirname + "/defaults");
var events = require('events');
var sys = require('sys');
//compatibility for old nodes
if(typeof events.EventEmitter.prototype.once !== 'function') {
events.EventEmitter.prototype.once = function (type, listener) {
var self = this;
@ -13,69 +14,6 @@ if(typeof events.EventEmitter.prototype.once !== 'function') {
};
}
var Pool = function(maxSize, createFn) {
events.EventEmitter.call(this);
this.maxSize = maxSize;
this.createFn = createFn;
this.items = [];
this.waits = [];
}
sys.inherits(Pool, events.EventEmitter);
var p = Pool.prototype;
p.checkOut = function(callback) {
if(!this.maxSize) {
return callback(null, this.createFn());
}
var len = 0;
for(var i = 0, len = this.items.length; i < len; i++) {
var item = this.items[i];
if(item.checkedIn) {
return this._pulse(item, callback);
}
}
//check if we can create a new item
if(this.items.length < this.maxSize && this.createFn) {
var result = this.createFn();
var item = {ref: result, checkedIn: true}
this.items.push(item);
if(item.checkedIn) {
return this._pulse(item, callback)
}
}
this.waits.push(callback);
return false; //did not execute sync
}
p.checkIn = function(item) {
//scan current items
for(var i = 0, len = this.items.length; i < len; i++) {
var currentItem = this.items[i];
if(currentItem.ref == item) {
currentItem.checkedIn = true;
this._pulse(currentItem);
return true;
}
}
//add new item
var newItem = {ref: item, checkedIn: true};
this.items.push(newItem);
this._pulse(newItem);
return false;
}
p._pulse = function(item, cb) {
cb = cb || this.waits.shift()
if(cb) {
item.checkedIn = false;
cb(null, item.ref)
return true;
}
return false;
};
var parseConnectionString = function(str) {
//unix socket
if(str.charAt(0) === '/') {
@ -155,7 +93,6 @@ var getLibpgConString = function(config, callback) {
}
module.exports = {
Pool: Pool,
normalizeConnectionInfo: normalizeConnectionInfo,
//only exported here to make testing of this method possible
//since it contains quite a bit of logic and testing for

View File

@ -1,6 +1,5 @@
require(__dirname + '/test-helper');
var utils = require(__dirname + "/../../lib/utils");
var Pool = utils.Pool;
var defaults = require(__dirname + "/../../lib").defaults;
//this tests the monkey patching
@ -21,101 +20,6 @@ test("EventEmitter.once", function() {
assert.equal(callCount, 1);
});
test('an empty pool', function() {
test('with no creation method', function() {
var pool = new Pool(10);
var brian = {name:'brian'};
test('can set and get an item', function() {
pool.checkIn(brian);
var sync = pool.checkOut(assert.calls(function(err, item) {
assert.equal(brian, item)
assert.same(brian, item)
}))
assert.ok(sync, "should have fired sync")
})
test('checkout blocks until item checked back in', function() {
var called = false;
var sync = pool.checkOut(assert.calls(function(err, item) {
called = true;
assert.equal(brian, item)
assert.same(brian, item)
}))
assert.ok(sync === false, "Should not have fired sync")
assert.ok(called === false, "Should not have fired callback yet")
pool.checkIn(brian)
})
})
test('with a creation method', function() {
var customName = "first";
var callCount = 0;
var pool = new Pool(3, function() {
return {name: customName + (++callCount)};
});
test('creates if pool is not at max size', function() {
var sync = pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, "first1");
}))
assert.ok(sync, "Should have generated item & called callback in sync")
})
test('creates again if item is checked out', function() {
var sync = pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, "first2")
}))
assert.ok(sync, "Should have called in sync again")
})
var external = {name: 'boom'};
test('can add another item', function() {
pool.checkIn(external)
var sync = pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, 'boom')
}))
assert.ok(sync, "Should have fired 3rd in sync")
})
test('after pool is full, create is not called again', function() {
var called = false;
var sync = pool.checkOut(assert.calls(function(err, item) {
called = true;
assert.equal(item.name, 'boom')
}))
assert.ok(sync === false, "should not be sync")
assert.ok(called === false, "should not have called callback")
pool.checkIn(external);
})
})
})
test('a pool with size of zero', function() {
var index = 0;
var pool = new Pool(0, function() {
return index++;
})
test('checkin does nothing', function() {
index = 0;
pool.checkIn(301813);
assert.equal(pool.checkOut(assert.calls(function(err, item) {
assert.equal(item, 0);
})));
})
test('always creates a new item', function() {
index = 0;
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item, 0);
}))
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item, 1);
}))
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item, 2);
}))
})
})
test('normalizing connection info', function() {
test('with objects', function() {