Initial commit

This commit is contained in:
brianc 2016-06-07 19:16:19 -05:00
commit c0d39055f2
5 changed files with 272 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
node_modules

118
index.js Normal file
View File

@ -0,0 +1,118 @@
var genericPool = require('generic-pool')
var util = require('util')
var EventEmitter = require('events').EventEmitter
var debug = require('debug')
var Pool = module.exports = function(options) {
EventEmitter.call(this)
this.options = options || {}
this.log = this.options.log || debug('pg:pool')
this.Client = this.options.Client || require('pg').Client
this.Promise = this.options.Promise || Promise
this.options.max = this.options.max || this.options.poolSize || 10
this.options.create = this.options.create || this._create.bind(this)
this.options.destroy = this.options.destroy || this._destroy.bind(this)
this.pool = new genericPool.Pool(this.options)
}
util.inherits(Pool, EventEmitter)
Pool.prototype._destroy = function(client) {
if (client._destroying) return
client._destroying = true
client.end()
}
Pool.prototype._create = function(cb) {
this.log('connecting new client')
var client = new this.Client(this.options)
client.on('error', function(e) {
this.log('connected client error:', e)
this.pool.destroy(client)
e.client = client
this.emit('error', e)
}.bind(this))
client.connect(function(err) {
this.log('client connected')
if (err) {
this.log('client connection error:', e)
cb(err)
}
client.queryAsync = function(text, values) {
return new this.Promise((resolve, reject) => {
client.query(text, values, function(err, res) {
err ? reject(err) : resolve(res)
})
})
}.bind(this)
cb(err, err ? null : client)
}.bind(this))
}
Pool.prototype.connect = function(cb) {
return new this.Promise(function(resolve, reject) {
this.log('acquire client begin')
this.pool.acquire(function(err, client) {
if (err) {
this.log('acquire client. error:', err)
if (cb) {
cb(err, null, function() { })
}
return reject(err)
}
this.log('acquire client')
client.release = function(err) {
if (err) {
this.log('release client. error:', err)
this.pool.destroy(client)
}
this.log('release client')
delete client.release
this.pool.release(client)
}.bind(this)
if (cb) {
cb(null, client, client.release)
}
return resolve(client)
}.bind(this))
}.bind(this))
}
Pool.prototype.take = Pool.prototype.connect
Pool.prototype.query = function(text, values) {
return this.take().then(function(client) {
return client.queryAsync(text, values)
.then(function(res) {
client.release()
return res
}).catch(function(error) {
client.release(error)
throw error
})
})
}
Pool.prototype.end = function(cb) {
this.log('draining pool')
return new this.Promise(function(resolve, reject) {
this.pool.drain(function() {
this.log('pool drained, calling destroy all now')
this.pool.destroyAllNow(function() {
if(cb) {
cb()
}
resolve()
})
}.bind(this))
}.bind(this))
}

40
package.json Normal file
View File

@ -0,0 +1,40 @@
{
"name": "pg-pool",
"version": "0.0.1",
"description": "Connection pool for node-postgres",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "node_modules/.bin/mocha"
},
"repository": {
"type": "git",
"url": "git://github.com/brianc/node-pg-pool.git"
},
"keywords": [
"pg",
"postgres",
"pool",
"database"
],
"author": "Brian M. Carlson",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-pg-pool/issues"
},
"homepage": "https://github.com/brianc/node-pg-pool#readme",
"devDependencies": {
"bluebird": "3.4.0",
"co": "4.6.0",
"expect.js": "0.3.1",
"lodash": "4.13.1",
"mocha": "^2.3.3",
"pg": "4.5.6"
},
"dependencies": {
"debug": "^2.2.0",
"generic-pool": "2.4.2"
}
}

111
test/index.js Normal file
View File

@ -0,0 +1,111 @@
var expect = require('expect.js')
var Client = require('pg').Client
var co = require('co')
var Promise = require('bluebird')
var _ = require('lodash')
var Pool = require('../')
describe('pool', function() {
describe('with callbacks', function() {
it('works totally unconfigured', function(done) {
const pool = new Pool()
pool.connect(function(err, client, release) {
if (err) return done(err)
client.query('SELECT NOW()', function(err, res) {
release()
if (err) return done(err)
expect(res.rows).to.have.length(1)
pool.end(done)
})
})
})
it('passes props to clients', function(done) {
const pool = new Pool({ binary: true })
pool.connect(function(err, client, release) {
release()
expect(client.binary).to.eql(true)
pool.end(done)
})
})
it('removes client if it errors in background', function(done) {
const pool = new Pool()
pool.connect(function(err, client, release) {
release()
client.testString = 'foo'
setTimeout(function() {
client.emit('error', new Error('on purpose'))
}, 10)
})
pool.on('error', function(err) {
expect(err.message).to.be('on purpose')
expect(err.client).to.not.be(undefined)
expect(err.client.testString).to.be('foo')
err.client.connection.stream.on('end', function() {
pool.end(done)
})
})
})
})
describe('with promises', function() {
it('connects and disconnects', co.wrap(function*() {
var pool = new Pool()
var client = yield pool.connect()
expect(pool.pool.availableObjectsCount()).to.be(0)
var res = yield client.queryAsync('select $1::text as name', ['hi'])
expect(res.rows).to.eql([{ name: 'hi' }])
client.release()
expect(pool.pool.getPoolSize()).to.be(1)
expect(pool.pool.availableObjectsCount()).to.be(1)
return yield pool.end()
}))
it('properly pools clients', co.wrap(function*() {
var pool = new Pool({ poolSize: 9 })
var count = 0
while (count < 30) {
count++
pool.connect().then(function(client) {
client.queryAsync('select $1::text as name', ['hi']).then(function(res) {
client.release()
})
})
}
yield Promise.delay(100)
expect(pool.pool.getPoolSize()).to.be(9)
return yield pool.end()
}))
it('supports just running queries', co.wrap(function*() {
var pool = new Pool({ poolSize: 9 })
var count = 0
var queries = _.times(30).map(function() {
return pool.query('SELECT $1::text as name', ['hi'])
})
console.log('executing')
yield queries
expect(pool.pool.getPoolSize()).to.be(9)
expect(pool.pool.availableObjectsCount()).to.be(9)
return yield pool.end()
}))
it('recovers from all errors', co.wrap(function*() {
var pool = new Pool({ poolSize: 9 })
var count = 0
while(count++ < 30) {
try {
yield pool.query('SELECT lksjdfd')
} catch(e) {
}
}
var res = yield pool.query('SELECT $1::text as name', ['hi'])
expect(res.rows).to.eql([{ name: 'hi' }])
return yield pool.end()
}))
})
})

2
test/mocha.opts Normal file
View File

@ -0,0 +1,2 @@
--no-exit
--bail