From ab13d0c1eba2993afd131e422bdece48c04eafdd Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Fri, 10 Dec 2010 17:32:34 -0600 Subject: [PATCH] initial crack at connection pooling -- still dirty --- lib/index.js | 100 +++++++++++++++++- lib/utils.js | 27 +++-- test/integration/client/api-tests.js | 7 +- .../double-connection-tests.js | 0 .../connection-pool/max-connection-tests.js | 0 .../single-connection-tests.js | 32 ++++++ .../connection-pool/test-helper.js | 4 + .../waiting-connection-tests.js | 0 test/test-helper.js | 2 +- test/unit/utils-tests.js | 26 ++++- 10 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 test/integration/connection-pool/double-connection-tests.js create mode 100644 test/integration/connection-pool/max-connection-tests.js create mode 100644 test/integration/connection-pool/single-connection-tests.js create mode 100644 test/integration/connection-pool/test-helper.js create mode 100644 test/integration/connection-pool/waiting-connection-tests.js diff --git a/lib/index.js b/lib/index.js index 6f56b7e6..da3be82f 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,11 +1,106 @@ var EventEmitter = require('events').EventEmitter; var sys = require('sys'); var net = require('net'); - +var Pool = require(__dirname + '/utils').Pool; var Client = require(__dirname+'/client'); +var defaults = require(__dirname + '/defaults'); +//connection pool global cache +var clientPools = { +} + +var poolEnabled = function() { + return defaults.poolSize; +} + +var log = function() { + +} + +var log = function() { + console.log.apply(console, arguments); +} + +var getPooledClient = function(config, callback) { + //lookup pool using config as key + //TODO this don't work so hot w/ object configs + var pool = clientPools[config]; + + //create pool if doesn't exist + if(!pool) { + log("creating pool %s", config) + pool = clientPools[config] = new Pool(defaults.poolSize, function() { + log("creating new client in pool %s", config) + var client = new Client(config); + client.connected = false; + return client; + }) + } + + pool.checkOut(function(err, client) { + //if client already connected just + //pass it along to the callback and return + if(client.connected) { + callback(null, client); + return; + } + + var onError = function(error) { + client.connection.removeListener('readyForQuery', onReady); + callback(error); + pool.checkIn(client); + } + + var onReady = function() { + client.removeListener('error', onError); + client.connected = true; + callback(null, client); + client.on('drain', function() { + pool.checkIn(client); + }); + } + + client.once('error', onError); + + //TODO refactor + //i don't like reaching into the client's connection for attaching + //to specific events here + client.connection.once('readyForQuery', onReady); + + client.connect(); + + }); +} + +//destroys the world +var end = function(callback) { + for(var name in clientPools) { + var pool = clientPools[name]; + log("destroying pool %s", name); + pool.waits.forEach(function(wait) { + wait(new Error("Client is being destroyed")) + }) + pool.waits = []; + pool.items.forEach(function(item) { + var client = item.ref; + if(client.activeQuery) { + log("client is still active, waiting for it to complete"); + client.on('drain', client.end.bind(client)) + } else { + client.end(); + } + }) + //remove reference to pool lookup + clientPools[name] = null; + delete(clientPools[name]) + } +} //wrap up common connection management boilerplate var connect = function(config, callback) { + if(poolEnabled()) { + return getPooledClient(config, callback) + } + throw new Error("FUCK") var client = new Client(config); client.connect(); @@ -32,5 +127,6 @@ module.exports = { Client: Client, Connection: require(__dirname + '/connection'), connect: connect, - defaults: require(__dirname + '/defaults') + end: end, + defaults: defaults } diff --git a/lib/utils.js b/lib/utils.js index 94cdd46b..04023242 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,4 +1,5 @@ var events = require('events'); +var sys = require('sys'); if(typeof events.EventEmitter.prototype.once !== 'function') { events.EventEmitter.prototype.once = function (type, listener) { @@ -10,12 +11,13 @@ if(typeof events.EventEmitter.prototype.once !== 'function') { }; } var Pool = function(maxSize, createFn) { + events.EventEmitter.call(this); this.maxSize = maxSize; this.createFn = createFn; this.items = []; this.waits = []; } - +sys.inherits(Pool, events.EventEmitter); var p = Pool.prototype; p.checkOut = function(callback) { @@ -27,10 +29,19 @@ p.checkOut = function(callback) { } } //check if we can create a new item - if(len < this.maxSize && this.createFn) { - var item = {ref: this.createFn()} + if(this.items.length < this.maxSize && this.createFn) { + var result = this.createFn(); + var item = result; + //create function can return item conforming to interface + //of stored items to allow for create function to create + //checked out items + if(typeof item.checkedIn == "undefined") { + var item = {ref: result, checkedIn: true} + } this.items.push(item); - return this._pulse(item, callback) + if(item.checkedIn) { + return this._pulse(item, callback) + } } this.waits.push(callback); return false; //did not execute sync @@ -42,17 +53,19 @@ p.checkIn = function(item) { var currentItem = this.items[i]; if(currentItem.ref == item) { currentItem.checkedIn = true; - return this._pulse(currentItem); + this._pulse(currentItem); + return true; } } //add new item var newItem = {ref: item, checkedIn: true}; this.items.push(newItem); - return this._pulse(newItem); + this._pulse(newItem); + return false; } p._pulse = function(item, cb) { - cb = cb || this.waits.pop() + cb = cb || this.waits.shift() if(cb) { item.checkedIn = false; cb(null, item.ref) diff --git a/test/integration/client/api-tests.js b/test/integration/client/api-tests.js index efa8fb97..4478b78b 100644 --- a/test/integration/client/api-tests.js +++ b/test/integration/client/api-tests.js @@ -36,7 +36,6 @@ test('api', function() { })) }) - test('executing nested queries', function() { pg.connect(helper.args, assert.calls(function(err, client) { client.query('select now as now from NOW()', assert.calls(function(err, result) { @@ -50,9 +49,11 @@ test('executing nested queries', function() { })) }) - test('raises error if cannot connect', function() { - pg.connect({database:'asdlfkajsdf there is no way this is a real database, right?!'}, assert.calls(function(err, client) { + var connectionString = "pg://asdf@seoiasfd:4884/ieieie"; + pg.connect(connectionString, assert.calls(function(err, client) { assert.ok(err, 'should have raised an error') })) }) + +pg.end(); diff --git a/test/integration/connection-pool/double-connection-tests.js b/test/integration/connection-pool/double-connection-tests.js new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/connection-pool/max-connection-tests.js b/test/integration/connection-pool/max-connection-tests.js new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/connection-pool/single-connection-tests.js b/test/integration/connection-pool/single-connection-tests.js new file mode 100644 index 00000000..ad565e60 --- /dev/null +++ b/test/integration/connection-pool/single-connection-tests.js @@ -0,0 +1,32 @@ +var helper = require(__dirname + "/test-helper") + +setTimeout(function() { + helper.pg.defaults.poolSize = 10; + test('executes a single pooled connection/query', function() { + var args = helper.args; + var conString = "pg://"+args.user+":"+args.password+"@"+args.host+":"+args.port+"/"+args.database; + var queryCount = 0; + helper.pg.connect(conString, assert.calls(function(err, client) { + assert.isNull(err); + client.query("select * from NOW()", assert.calls(function(err, result) { + assert.isNull(err); + queryCount++; + })) + })) + var id = setTimeout(function() { + assert.equal(queryCount, 1) + }, 1000) + var check = function() { + setTimeout(function() { + if(queryCount == 1) { + clearTimeout(id) + helper.pg.end(); + } else { + check(); + } + }, 50) + } + check(); + }) +}, 1000) + diff --git a/test/integration/connection-pool/test-helper.js b/test/integration/connection-pool/test-helper.js new file mode 100644 index 00000000..0e6a6ac8 --- /dev/null +++ b/test/integration/connection-pool/test-helper.js @@ -0,0 +1,4 @@ +module.exports = { + args: require(__dirname + "/../test-helper").args, + pg: require(__dirname + "/../../../lib") +} diff --git a/test/integration/connection-pool/waiting-connection-tests.js b/test/integration/connection-pool/waiting-connection-tests.js new file mode 100644 index 00000000..e69de29b diff --git a/test/test-helper.js b/test/test-helper.js index e748fcaf..66f925b6 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -86,7 +86,7 @@ var expect = function(callback, timeout) { var executed = false; var id = setTimeout(function() { assert.ok(executed, "Expected execution of " + callback + " fired"); - }, timeout || 1000) + }, timeout || 2000) return function(err, queryResult) { clearTimeout(id); diff --git a/test/unit/utils-tests.js b/test/unit/utils-tests.js index dd71244e..88350cca 100644 --- a/test/unit/utils-tests.js +++ b/test/unit/utils-tests.js @@ -60,7 +60,7 @@ test('an empty pool', function() { })) assert.ok(sync, "Should have generated item & called callback in sync") }) - + test('creates again if item is checked out', function() { var sync = pool.checkOut(assert.calls(function(err, item) { assert.equal(item.name, "first2") @@ -87,6 +87,30 @@ test('an empty pool', function() { pool.checkIn(external); }) }) +}) + +test('when creating async new pool members', function() { + var count = 0; + var pool = new Pool(3, function() { + var item = {ref: {name: ++count}, checkedIn: false}; + process.nextTick(function() { + pool.checkIn(item.ref) + }) + return item; + }) + test('one request recieves member', function() { + pool.checkOut(assert.calls(function(err, item) { + assert.equal(item.name, 1) + pool.checkOut(assert.calls(function(err, item) { + assert.equal(item.name, 2) + pool.checkOut(assert.calls(function(err, item) { + assert.equal(item.name, 3) + })) + })) + })) + }) }) + +