From b3e56afea21c87f9eba901df14d078e38728941d Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Mon, 13 Dec 2010 17:21:40 -0600 Subject: [PATCH] initial working connection pool implementation & tests --- lib/index.js | 116 ++++++++++-------- test/integration/client/api-tests.js | 17 ++- test/integration/client/test-helper.js | 4 +- .../double-connection-tests.js | 2 + .../connection-pool/max-connection-tests.js | 3 + .../single-connection-tests.js | 32 +---- .../connection-pool/test-helper.js | 33 ++++- .../waiting-connection-tests.js | 2 + 8 files changed, 117 insertions(+), 92 deletions(-) diff --git a/lib/index.js b/lib/index.js index 3e36edbe..645bf0ea 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,6 +4,36 @@ var net = require('net'); var Pool = require(__dirname + '/utils').Pool; var Client = require(__dirname+'/client'); var defaults = require(__dirname + '/defaults'); + +//wrap up common connection management boilerplate +var connect = function(config, callback) { + if(poolEnabled()) { + return getPooledClient(config, callback) + } + throw new Error("Should be testing pools") + var client = new Client(config); + client.connect(); + + var onError = function(error) { + client.connection.removeListener('readyForQuery', onReady); + callback(error); + } + + var onReady = function() { + client.removeListener('error', onError); + callback(null, client); + client.on('drain', client.end.bind(client)); + } + + client.once('error', onError); + + //TODO refactor + //i don't like reaching into the client's connection for attaching + //to specific events here + client.connection.once('readyForQuery', onReady); +} + + //connection pool global cache var clientPools = { } @@ -13,12 +43,13 @@ var poolEnabled = function() { } var log = function() { - + //do nothing } -var log = function() { - console.log.apply(console, arguments); -} +//for testing +// var log = function() { +// console.log.apply(console, arguments); +// } var getPooledClient = function(config, callback) { //lookup pool using config as key @@ -35,8 +66,9 @@ var getPooledClient = function(config, callback) { return client; }) } - + pool.checkOut(function(err, client) { + //if client already connected just //pass it along to the callback and return if(client.connected) { @@ -72,56 +104,36 @@ var getPooledClient = function(config, callback) { } //destroys the world -var end = function(callback) { - for(var name in clientPools) { - var pool = clientPools[name]; - log("destroying pool %s", name); - pool.waits.forEach(function(wait) { - wait(new Error("Client is being destroyed")) - }) - pool.waits = []; - pool.items.forEach(function(item) { - var client = item.ref; - if(client.activeQuery) { - log("client is still active, waiting for it to complete"); - client.on('drain', client.end.bind(client)) - } else { - client.end(); - } - }) - //remove reference to pool lookup - clientPools[name] = null; - delete(clientPools[name]) +//or optionally only a single pool +//mostly used for testing or +//a hard shutdown +var end = function(name) { + if(!name) { + for(var poolName in clientPools) { + end(poolName) + return + } } + var pool = clientPools[name]; + log("destroying pool %s", name); + pool.waits.forEach(function(wait) { + wait(new Error("Client is being destroyed")) + }) + pool.waits = []; + pool.items.forEach(function(item) { + var client = item.ref; + if(client.activeQuery) { + log("client is still active, waiting for it to complete"); + client.on('drain', client.end.bind(client)) + } else { + client.end(); + } + }) + //remove reference to pool lookup + clientPools[name] = null; + delete(clientPools[name]) } -//wrap up common connection management boilerplate -var connect = function(config, callback) { -// if(poolEnabled()) { -// return getPooledClient(config, callback) -// } - - var client = new Client(config); - client.connect(); - - var onError = function(error) { - client.connection.removeListener('readyForQuery', onReady); - callback(error); - } - - var onReady = function() { - client.removeListener('error', onError); - callback(null, client); - client.on('drain', client.end.bind(client)); - } - - client.once('error', onError); - - //TODO refactor - //i don't like reaching into the client's connection for attaching - //to specific events here - client.connection.once('readyForQuery', onReady); -} module.exports = { Client: Client, diff --git a/test/integration/client/api-tests.js b/test/integration/client/api-tests.js index 4478b78b..22af61a2 100644 --- a/test/integration/client/api-tests.js +++ b/test/integration/client/api-tests.js @@ -1,9 +1,15 @@ var helper = require(__dirname + '/../test-helper'); var pg = require(__dirname + '/../../../lib'); +var connectionString = helper.connectionString(__filename); + + +var sink = new helper.Sink(2, function() { + pg.end(connectionString); +}); test('api', function() { - pg.connect(helper.args, assert.calls(function(err, client) { - assert.equal(err, null, "Failed to connect"); + pg.connect(connectionString, assert.calls(function(err, client) { + assert.equal(err, null, "Failed to connect: " + sys.inspect(err)); client.query('CREATE TEMP TABLE band(name varchar(100))'); @@ -30,6 +36,7 @@ test('api', function() { assert.length(result.rows, 2); assert.equal(result.rows.pop().name, 'the flaming lips'); assert.equal(result.rows.pop().name, 'the beach boys'); + sink.add(); })) })) @@ -37,12 +44,14 @@ test('api', function() { }) test('executing nested queries', function() { - pg.connect(helper.args, assert.calls(function(err, client) { + pg.connect(connectionString, assert.calls(function(err, client) { + assert.isNull(err); client.query('select now as now from NOW()', assert.calls(function(err, result) { assert.equal(new Date().getYear(), result.rows[0].now.getYear()) client.query('select now as now_again FROM NOW()', assert.calls(function() { client.query('select * FROM NOW()', assert.calls(function() { assert.ok('all queries hit') + sink.add(); })) })) })) @@ -55,5 +64,3 @@ test('raises error if cannot connect', function() { assert.ok(err, 'should have raised an error') })) }) - -pg.end(); diff --git a/test/integration/client/test-helper.js b/test/integration/client/test-helper.js index 3984d63a..6371171c 100644 --- a/test/integration/client/test-helper.js +++ b/test/integration/client/test-helper.js @@ -12,5 +12,7 @@ module.exports = { }); client.connect(); return client; - } + }, + connectionString: helper.connectionString, + Sink: helper.Sink }; diff --git a/test/integration/connection-pool/double-connection-tests.js b/test/integration/connection-pool/double-connection-tests.js index e69de29b..ae7eb316 100644 --- a/test/integration/connection-pool/double-connection-tests.js +++ b/test/integration/connection-pool/double-connection-tests.js @@ -0,0 +1,2 @@ +var helper = require(__dirname + "/test-helper") +helper.testPoolSize(2); diff --git a/test/integration/connection-pool/max-connection-tests.js b/test/integration/connection-pool/max-connection-tests.js index e69de29b..61755a0b 100644 --- a/test/integration/connection-pool/max-connection-tests.js +++ b/test/integration/connection-pool/max-connection-tests.js @@ -0,0 +1,3 @@ +var helper = require(__dirname + "/test-helper") +helper.testPoolSize(10); +helper.testPoolSize(11); diff --git a/test/integration/connection-pool/single-connection-tests.js b/test/integration/connection-pool/single-connection-tests.js index ad565e60..5ca0a888 100644 --- a/test/integration/connection-pool/single-connection-tests.js +++ b/test/integration/connection-pool/single-connection-tests.js @@ -1,32 +1,2 @@ var helper = require(__dirname + "/test-helper") - -setTimeout(function() { - helper.pg.defaults.poolSize = 10; - test('executes a single pooled connection/query', function() { - var args = helper.args; - var conString = "pg://"+args.user+":"+args.password+"@"+args.host+":"+args.port+"/"+args.database; - var queryCount = 0; - helper.pg.connect(conString, assert.calls(function(err, client) { - assert.isNull(err); - client.query("select * from NOW()", assert.calls(function(err, result) { - assert.isNull(err); - queryCount++; - })) - })) - var id = setTimeout(function() { - assert.equal(queryCount, 1) - }, 1000) - var check = function() { - setTimeout(function() { - if(queryCount == 1) { - clearTimeout(id) - helper.pg.end(); - } else { - check(); - } - }, 50) - } - check(); - }) -}, 1000) - +helper.testPoolSize(1); diff --git a/test/integration/connection-pool/test-helper.js b/test/integration/connection-pool/test-helper.js index 0e6a6ac8..aebea427 100644 --- a/test/integration/connection-pool/test-helper.js +++ b/test/integration/connection-pool/test-helper.js @@ -1,4 +1,31 @@ -module.exports = { - args: require(__dirname + "/../test-helper").args, - pg: require(__dirname + "/../../../lib") +var helper = require(__dirname + "/../test-helper"); +var pg = require(__dirname + "/../../../lib"); +helper.pg = pg; + +var testPoolSize = function(max) { + var conString = helper.connectionString(); + var sink = new helper.Sink(max, function() { + helper.pg.end(conString); + }); + + test("can pool " + max + " times", function() { + for(var i = 0; i < max; i++) { + helper.pg.poolSize = 10; + helper.pg.connect(conString, function(err, client) { + assert.isNull(err); + client.query("select * from NOW()", function() { + sink.add(); + }) + }) + } + }) } + +module.exports = { + args: helper.args, + pg: helper.pg, + connectionString: helper.connectionString, + Sink: helper.Sink, + testPoolSize: testPoolSize +} + diff --git a/test/integration/connection-pool/waiting-connection-tests.js b/test/integration/connection-pool/waiting-connection-tests.js index e69de29b..f2519ec5 100644 --- a/test/integration/connection-pool/waiting-connection-tests.js +++ b/test/integration/connection-pool/waiting-connection-tests.js @@ -0,0 +1,2 @@ +var helper = require(__dirname + "/test-helper") +helper.testPoolSize(200);