initial working connection pool implementation & tests

This commit is contained in:
Brian Carlson 2010-12-13 17:21:40 -06:00
parent 618c268788
commit b3e56afea2
8 changed files with 117 additions and 92 deletions

View File

@ -4,6 +4,36 @@ var net = require('net');
var Pool = require(__dirname + '/utils').Pool;
var Client = require(__dirname+'/client');
var defaults = require(__dirname + '/defaults');
//wrap up common connection management boilerplate
var connect = function(config, callback) {
if(poolEnabled()) {
return getPooledClient(config, callback)
}
throw new Error("Should be testing pools")
var client = new Client(config);
client.connect();
var onError = function(error) {
client.connection.removeListener('readyForQuery', onReady);
callback(error);
}
var onReady = function() {
client.removeListener('error', onError);
callback(null, client);
client.on('drain', client.end.bind(client));
}
client.once('error', onError);
//TODO refactor
//i don't like reaching into the client's connection for attaching
//to specific events here
client.connection.once('readyForQuery', onReady);
}
//connection pool global cache
var clientPools = {
}
@ -13,12 +43,13 @@ var poolEnabled = function() {
}
var log = function() {
//do nothing
}
var log = function() {
console.log.apply(console, arguments);
}
//for testing
// var log = function() {
// console.log.apply(console, arguments);
// }
var getPooledClient = function(config, callback) {
//lookup pool using config as key
@ -35,8 +66,9 @@ var getPooledClient = function(config, callback) {
return client;
})
}
pool.checkOut(function(err, client) {
//if client already connected just
//pass it along to the callback and return
if(client.connected) {
@ -72,56 +104,36 @@ var getPooledClient = function(config, callback) {
}
//destroys the world
var end = function(callback) {
for(var name in clientPools) {
var pool = clientPools[name];
log("destroying pool %s", name);
pool.waits.forEach(function(wait) {
wait(new Error("Client is being destroyed"))
})
pool.waits = [];
pool.items.forEach(function(item) {
var client = item.ref;
if(client.activeQuery) {
log("client is still active, waiting for it to complete");
client.on('drain', client.end.bind(client))
} else {
client.end();
}
})
//remove reference to pool lookup
clientPools[name] = null;
delete(clientPools[name])
//or optionally only a single pool
//mostly used for testing or
//a hard shutdown
var end = function(name) {
if(!name) {
for(var poolName in clientPools) {
end(poolName)
return
}
}
var pool = clientPools[name];
log("destroying pool %s", name);
pool.waits.forEach(function(wait) {
wait(new Error("Client is being destroyed"))
})
pool.waits = [];
pool.items.forEach(function(item) {
var client = item.ref;
if(client.activeQuery) {
log("client is still active, waiting for it to complete");
client.on('drain', client.end.bind(client))
} else {
client.end();
}
})
//remove reference to pool lookup
clientPools[name] = null;
delete(clientPools[name])
}
//wrap up common connection management boilerplate
var connect = function(config, callback) {
// if(poolEnabled()) {
// return getPooledClient(config, callback)
// }
var client = new Client(config);
client.connect();
var onError = function(error) {
client.connection.removeListener('readyForQuery', onReady);
callback(error);
}
var onReady = function() {
client.removeListener('error', onError);
callback(null, client);
client.on('drain', client.end.bind(client));
}
client.once('error', onError);
//TODO refactor
//i don't like reaching into the client's connection for attaching
//to specific events here
client.connection.once('readyForQuery', onReady);
}
module.exports = {
Client: Client,

View File

@ -1,9 +1,15 @@
var helper = require(__dirname + '/../test-helper');
var pg = require(__dirname + '/../../../lib');
var connectionString = helper.connectionString(__filename);
var sink = new helper.Sink(2, function() {
pg.end(connectionString);
});
test('api', function() {
pg.connect(helper.args, assert.calls(function(err, client) {
assert.equal(err, null, "Failed to connect");
pg.connect(connectionString, assert.calls(function(err, client) {
assert.equal(err, null, "Failed to connect: " + sys.inspect(err));
client.query('CREATE TEMP TABLE band(name varchar(100))');
@ -30,6 +36,7 @@ test('api', function() {
assert.length(result.rows, 2);
assert.equal(result.rows.pop().name, 'the flaming lips');
assert.equal(result.rows.pop().name, 'the beach boys');
sink.add();
}))
}))
@ -37,12 +44,14 @@ test('api', function() {
})
test('executing nested queries', function() {
pg.connect(helper.args, assert.calls(function(err, client) {
pg.connect(connectionString, assert.calls(function(err, client) {
assert.isNull(err);
client.query('select now as now from NOW()', assert.calls(function(err, result) {
assert.equal(new Date().getYear(), result.rows[0].now.getYear())
client.query('select now as now_again FROM NOW()', assert.calls(function() {
client.query('select * FROM NOW()', assert.calls(function() {
assert.ok('all queries hit')
sink.add();
}))
}))
}))
@ -55,5 +64,3 @@ test('raises error if cannot connect', function() {
assert.ok(err, 'should have raised an error')
}))
})
pg.end();

View File

@ -12,5 +12,7 @@ module.exports = {
});
client.connect();
return client;
}
},
connectionString: helper.connectionString,
Sink: helper.Sink
};

View File

@ -0,0 +1,2 @@
var helper = require(__dirname + "/test-helper")
helper.testPoolSize(2);

View File

@ -0,0 +1,3 @@
var helper = require(__dirname + "/test-helper")
helper.testPoolSize(10);
helper.testPoolSize(11);

View File

@ -1,32 +1,2 @@
var helper = require(__dirname + "/test-helper")
setTimeout(function() {
helper.pg.defaults.poolSize = 10;
test('executes a single pooled connection/query', function() {
var args = helper.args;
var conString = "pg://"+args.user+":"+args.password+"@"+args.host+":"+args.port+"/"+args.database;
var queryCount = 0;
helper.pg.connect(conString, assert.calls(function(err, client) {
assert.isNull(err);
client.query("select * from NOW()", assert.calls(function(err, result) {
assert.isNull(err);
queryCount++;
}))
}))
var id = setTimeout(function() {
assert.equal(queryCount, 1)
}, 1000)
var check = function() {
setTimeout(function() {
if(queryCount == 1) {
clearTimeout(id)
helper.pg.end();
} else {
check();
}
}, 50)
}
check();
})
}, 1000)
helper.testPoolSize(1);

View File

@ -1,4 +1,31 @@
module.exports = {
args: require(__dirname + "/../test-helper").args,
pg: require(__dirname + "/../../../lib")
var helper = require(__dirname + "/../test-helper");
var pg = require(__dirname + "/../../../lib");
helper.pg = pg;
var testPoolSize = function(max) {
var conString = helper.connectionString();
var sink = new helper.Sink(max, function() {
helper.pg.end(conString);
});
test("can pool " + max + " times", function() {
for(var i = 0; i < max; i++) {
helper.pg.poolSize = 10;
helper.pg.connect(conString, function(err, client) {
assert.isNull(err);
client.query("select * from NOW()", function() {
sink.add();
})
})
}
})
}
module.exports = {
args: helper.args,
pg: helper.pg,
connectionString: helper.connectionString,
Sink: helper.Sink,
testPoolSize: testPoolSize
}

View File

@ -0,0 +1,2 @@
var helper = require(__dirname + "/test-helper")
helper.testPoolSize(200);