node-postgres/index.js
Brian C a0eb36d819 2.0 (#67)
* Initial work

* Make progress on custom pool

* Make all original tests pass

* Fix test race

* Fix test when DNS is missing

* Test more error conditions

* Add test for byop

* Add BYOP tests for errors

* Add test for idle client error expunging

* Fix typo

* Replace var with const/let

* Remove var usage

* Fix linting

* Work on connection timeout

* Work on error condition tests

* Remove logging

* Add connection timeout

* Add idle timeout

* Test for returning to client to pool after error

fixes #48

* Add idleTimeout support to native client

* Add pg as peer dependency

fixes #45

* Rename properties

* Fix lint

* use strict

* Add draining to pool.end

* Ensure ending pools drain properly

* Remove yarn.lock

* Remove object-assign

* Remove node 8

* Remove closure for waiter construction

* Ensure client.connect is never sync

* Fix lint

* Change to es6 class

* Code cleanup & lint fixes
2017-07-13 22:37:08 -05:00

249 lines
6.4 KiB
JavaScript

'use strict'
const EventEmitter = require('events').EventEmitter
const NOOP = function () { }
class IdleItem {
constructor (client, timeoutId) {
this.client = client
this.timeoutId = timeoutId
}
}
function throwOnRelease () {
throw new Error('Release called on client which has already been released to the pool.')
}
function release (client, err) {
client.release = throwOnRelease
if (err) {
this._remove(client)
this._pulseQueue()
return
}
// idle timeout
let tid
if (this.options.idleTimeoutMillis) {
tid = setTimeout(() => {
this.log('remove idle client')
this._remove(client)
}, this.idleTimeoutMillis)
}
if (this.ending) {
this._remove(client)
} else {
this._idle.push(new IdleItem(client, tid))
}
this._pulseQueue()
}
function promisify (Promise, callback) {
if (callback) {
return { callback: callback, result: undefined }
}
let rej
let res
const cb = function (err, client) {
err ? rej(err) : res(client)
}
const result = new Promise(function (resolve, reject) {
res = resolve
rej = reject
})
return { callback: cb, result: result }
}
class Pool extends EventEmitter {
constructor (options, Client) {
super()
this.options = Object.assign({}, options)
this.options.max = this.options.max || this.options.poolSize || 10
this.log = this.options.log || function () { }
this.Client = this.options.Client || Client || require('pg').Client
this.Promise = this.options.Promise || global.Promise
this._clients = []
this._idle = []
this._pendingQueue = []
this._endCallback = undefined
this.ending = false
}
_isFull () {
return this._clients.length >= this.options.max
}
_pulseQueue () {
this.log('pulse queue')
if (this.ending) {
this.log('pulse queue on ending')
if (this._idle.length) {
this._idle.map(item => {
this._remove(item.client)
})
}
if (!this._clients.length) {
this._endCallback()
}
return
}
// if we don't have any waiting, do nothing
if (!this._pendingQueue.length) {
this.log('no queued requests')
return
}
// if we don't have any idle clients and we have no more room do nothing
if (!this._idle.length && this._isFull()) {
return
}
const waiter = this._pendingQueue.shift()
if (this._idle.length) {
const idleItem = this._idle.pop()
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
client.release = release.bind(this, client)
this.emit('acquire', client)
return waiter(undefined, client, client.release)
}
if (!this._isFull()) {
return this.connect(waiter)
}
throw new Error('unexpected condition')
}
_remove (client) {
this._idle = this._idle.filter(item => item.client !== client)
this._clients = this._clients.filter(c => c !== client)
client.end()
this.emit('remove', client)
}
connect (cb) {
if (this.ending) {
const err = new Error('Cannot use a pool after calling end on the pool')
return cb ? cb(err) : this.Promise.reject(err)
}
if (this._clients.length >= this.options.max || this._idle.length) {
const response = promisify(this.Promise, cb)
const result = response.result
this._pendingQueue.push(response.callback)
// if we have idle clients schedule a pulse immediately
if (this._idle.length) {
process.nextTick(() => this._pulseQueue())
}
return result
}
const client = new this.Client(this.options)
this._clients.push(client)
const idleListener = (err) => {
err.client = client
client.removeListener('error', idleListener)
client.on('error', () => {
this.log('additional client error after disconnection due to error', err)
})
this._remove(client)
// TODO - document that once the pool emits an error
// the client has already been closed & purged and is unusable
this.emit('error', err, client)
}
this.log('connecting new client')
// connection timeout logic
let tid
let timeoutHit = false
if (this.options.connectionTimeoutMillis) {
tid = setTimeout(() => {
this.log('ending client due to timeout')
timeoutHit = true
// force kill the node driver, and let libpq do its teardown
client.connection ? client.connection.stream.destroy() : client.end()
}, this.options.connectionTimeoutMillis)
}
const response = promisify(this.Promise, cb)
cb = response.callback
this.log('connecting new client')
client.connect((err) => {
this.log('new client connected')
if (tid) {
clearTimeout(tid)
}
client.on('error', idleListener)
if (err) {
// remove the dead client from our list of clients
this._clients = this._clients.filter(c => c !== client)
if (timeoutHit) {
err.message = 'Connection terminiated due to connection timeout'
}
cb(err, undefined, NOOP)
} else {
client.release = release.bind(this, client)
this.emit('connect', client)
this.emit('acquire', client)
if (this.options.verify) {
this.options.verify(client, cb)
} else {
cb(undefined, client, client.release)
}
}
})
return response.result
}
query (text, values, cb) {
if (typeof values === 'function') {
cb = values
values = undefined
}
const response = promisify(this.Promise, cb)
cb = response.callback
this.connect((err, client) => {
if (err) {
return cb(err)
}
this.log('dispatching query')
client.query(text, values, (err, res) => {
this.log('query dispatched')
client.release(err)
if (err) {
return cb(err)
} else {
return cb(undefined, res)
}
})
})
return response.result
}
end (cb) {
this.log('ending')
if (this.ending) {
const err = new Error('Called end on pool more than once')
return cb ? cb(err) : this.Promise.reject(err)
}
this.ending = true
const promised = promisify(this.Promise, cb)
this._endCallback = promised.callback
this._pulseQueue()
return promised.result
}
get waitingCount () {
return this._pendingQueue.length
}
get idleCount () {
return this._idle.length
}
get totalCount () {
return this._clients.length
}
}
module.exports = Pool