From c0d39055f2b5bc2fb6bf2b46c7f80980846fe73a Mon Sep 17 00:00:00 2001 From: brianc Date: Tue, 7 Jun 2016 19:16:19 -0500 Subject: [PATCH] Initial commit --- .gitignore | 1 + index.js | 118 ++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 40 ++++++++++++++++ test/index.js | 111 +++++++++++++++++++++++++++++++++++++++++++++ test/mocha.opts | 2 + 5 files changed, 272 insertions(+) create mode 100644 .gitignore create mode 100644 index.js create mode 100644 package.json create mode 100644 test/index.js create mode 100644 test/mocha.opts diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..3c3629e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/index.js b/index.js new file mode 100644 index 00000000..7c102d8a --- /dev/null +++ b/index.js @@ -0,0 +1,118 @@ +var genericPool = require('generic-pool') +var util = require('util') +var EventEmitter = require('events').EventEmitter +var debug = require('debug') + +var Pool = module.exports = function(options) { + EventEmitter.call(this) + this.options = options || {} + this.log = this.options.log || debug('pg:pool') + this.Client = this.options.Client || require('pg').Client + this.Promise = this.options.Promise || Promise + + this.options.max = this.options.max || this.options.poolSize || 10 + this.options.create = this.options.create || this._create.bind(this) + this.options.destroy = this.options.destroy || this._destroy.bind(this) + this.pool = new genericPool.Pool(this.options) +} + +util.inherits(Pool, EventEmitter) + +Pool.prototype._destroy = function(client) { + if (client._destroying) return + client._destroying = true + client.end() +} + +Pool.prototype._create = function(cb) { + this.log('connecting new client') + var client = new this.Client(this.options) + + client.on('error', function(e) { + this.log('connected client error:', e) + this.pool.destroy(client) + e.client = client + this.emit('error', e) + }.bind(this)) + + client.connect(function(err) { + this.log('client connected') + if (err) { + this.log('client connection error:', e) + cb(err) + } + + client.queryAsync = function(text, values) { + return new this.Promise((resolve, reject) => { + client.query(text, values, function(err, res) { + err ? reject(err) : resolve(res) + }) + }) + }.bind(this) + + cb(err, err ? null : client) + }.bind(this)) +} + +Pool.prototype.connect = function(cb) { + return new this.Promise(function(resolve, reject) { + this.log('acquire client begin') + this.pool.acquire(function(err, client) { + if (err) { + this.log('acquire client. error:', err) + if (cb) { + cb(err, null, function() { }) + } + return reject(err) + } + + this.log('acquire client') + + client.release = function(err) { + if (err) { + this.log('release client. error:', err) + this.pool.destroy(client) + } + this.log('release client') + delete client.release + this.pool.release(client) + }.bind(this) + + if (cb) { + cb(null, client, client.release) + } + + return resolve(client) + }.bind(this)) + }.bind(this)) +} + +Pool.prototype.take = Pool.prototype.connect + +Pool.prototype.query = function(text, values) { + return this.take().then(function(client) { + return client.queryAsync(text, values) + .then(function(res) { + client.release() + return res + }).catch(function(error) { + client.release(error) + throw error + }) + }) +} + +Pool.prototype.end = function(cb) { + this.log('draining pool') + return new this.Promise(function(resolve, reject) { + this.pool.drain(function() { + this.log('pool drained, calling destroy all now') + this.pool.destroyAllNow(function() { + if(cb) { + cb() + } + resolve() + }) + }.bind(this)) + }.bind(this)) +} diff --git a/package.json b/package.json new file mode 100644 index 00000000..7d8b8f09 --- /dev/null +++ b/package.json @@ -0,0 +1,40 @@ +{ + "name": "pg-pool", + "version": "0.0.1", + "description": "Connection pool for node-postgres", + "main": "index.js", + "directories": { + "test": "test" + }, + "scripts": { + "test": "node_modules/.bin/mocha" + }, + "repository": { + "type": "git", + "url": "git://github.com/brianc/node-pg-pool.git" + }, + "keywords": [ + "pg", + "postgres", + "pool", + "database" + ], + "author": "Brian M. Carlson", + "license": "MIT", + "bugs": { + "url": "https://github.com/brianc/node-pg-pool/issues" + }, + "homepage": "https://github.com/brianc/node-pg-pool#readme", + "devDependencies": { + "bluebird": "3.4.0", + "co": "4.6.0", + "expect.js": "0.3.1", + "lodash": "4.13.1", + "mocha": "^2.3.3", + "pg": "4.5.6" + }, + "dependencies": { + "debug": "^2.2.0", + "generic-pool": "2.4.2" + } +} diff --git a/test/index.js b/test/index.js new file mode 100644 index 00000000..4ea43cd0 --- /dev/null +++ b/test/index.js @@ -0,0 +1,111 @@ +var expect = require('expect.js') +var Client = require('pg').Client +var co = require('co') +var Promise = require('bluebird') +var _ = require('lodash') + +var Pool = require('../') + +describe('pool', function() { + + describe('with callbacks', function() { + it('works totally unconfigured', function(done) { + const pool = new Pool() + pool.connect(function(err, client, release) { + if (err) return done(err) + client.query('SELECT NOW()', function(err, res) { + release() + if (err) return done(err) + expect(res.rows).to.have.length(1) + pool.end(done) + }) + }) + }) + + it('passes props to clients', function(done) { + const pool = new Pool({ binary: true }) + pool.connect(function(err, client, release) { + release() + expect(client.binary).to.eql(true) + pool.end(done) + }) + }) + + it('removes client if it errors in background', function(done) { + const pool = new Pool() + pool.connect(function(err, client, release) { + release() + client.testString = 'foo' + setTimeout(function() { + client.emit('error', new Error('on purpose')) + }, 10) + }) + pool.on('error', function(err) { + expect(err.message).to.be('on purpose') + expect(err.client).to.not.be(undefined) + expect(err.client.testString).to.be('foo') + err.client.connection.stream.on('end', function() { + pool.end(done) + }) + }) + }) + }) + + describe('with promises', function() { + it('connects and disconnects', co.wrap(function*() { + var pool = new Pool() + var client = yield pool.connect() + expect(pool.pool.availableObjectsCount()).to.be(0) + var res = yield client.queryAsync('select $1::text as name', ['hi']) + expect(res.rows).to.eql([{ name: 'hi' }]) + client.release() + expect(pool.pool.getPoolSize()).to.be(1) + expect(pool.pool.availableObjectsCount()).to.be(1) + return yield pool.end() + })) + + it('properly pools clients', co.wrap(function*() { + var pool = new Pool({ poolSize: 9 }) + var count = 0 + while (count < 30) { + count++ + pool.connect().then(function(client) { + client.queryAsync('select $1::text as name', ['hi']).then(function(res) { + client.release() + }) + }) + } + yield Promise.delay(100) + expect(pool.pool.getPoolSize()).to.be(9) + return yield pool.end() + })) + + it('supports just running queries', co.wrap(function*() { + var pool = new Pool({ poolSize: 9 }) + var count = 0 + var queries = _.times(30).map(function() { + return pool.query('SELECT $1::text as name', ['hi']) + }) + console.log('executing') + yield queries + expect(pool.pool.getPoolSize()).to.be(9) + expect(pool.pool.availableObjectsCount()).to.be(9) + return yield pool.end() + })) + + it('recovers from all errors', co.wrap(function*() { + var pool = new Pool({ poolSize: 9 }) + var count = 0 + + while(count++ < 30) { + try { + yield pool.query('SELECT lksjdfd') + } catch(e) { + } + } + var res = yield pool.query('SELECT $1::text as name', ['hi']) + expect(res.rows).to.eql([{ name: 'hi' }]) + return yield pool.end() + })) + }) +}) diff --git a/test/mocha.opts b/test/mocha.opts new file mode 100644 index 00000000..46e8e69d --- /dev/null +++ b/test/mocha.opts @@ -0,0 +1,2 @@ +--no-exit +--bail