From f9fc232db36a09633425d16aeb46099ac6a1a3c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCrbach?= Date: Thu, 25 Jul 2019 19:38:21 +0200 Subject: [PATCH] Remove idleListener when client is in-use (#123) * Prevent double release with callback When using the callback instead of client.release, double releasing a client was possible causing clients to be re-added multiple times. * Remove idleListener when client is in-use When a client is in-use, the error handling should be done by the consumer and not by the pool itself as this otherwise might cause errors to be handled multiple times. * Handle verify failures --- index.js | 120 ++++++++++++++++++++++++++--------------- test/error-handling.js | 65 ++++++++++++++++++++-- 2 files changed, 138 insertions(+), 47 deletions(-) diff --git a/index.js b/index.js index cfe377c3..e93cc96c 100644 --- a/index.js +++ b/index.js @@ -12,8 +12,9 @@ const removeWhere = (list, predicate) => { } class IdleItem { - constructor (client, timeoutId) { + constructor (client, idleListener, timeoutId) { this.client = client + this.idleListener = idleListener this.timeoutId = timeoutId } } @@ -28,27 +29,6 @@ function throwOnRelease () { throw new Error('Release called on client which has already been released to the pool.') } -function release (client, err) { - client.release = throwOnRelease - if (err || this.ending) { - this._remove(client) - this._pulseQueue() - return - } - - // idle timeout - let tid - if (this.options.idleTimeoutMillis) { - tid = setTimeout(() => { - this.log('remove idle client') - this._remove(client) - }, this.options.idleTimeoutMillis) - } - - this._idle.push(new IdleItem(client, tid)) - this._pulseQueue() -} - function promisify (Promise, callback) { if (callback) { return { callback: callback, result: undefined } @@ -68,6 +48,7 @@ function promisify (Promise, callback) { function makeIdleListener (pool, client) { return function idleListener (err) { err.client = client + client.removeListener('error', idleListener) client.on('error', () => { pool.log('additional client error after disconnection due to error', err) @@ -132,17 +113,17 @@ class Pool extends EventEmitter { if (!this._idle.length && this._isFull()) { return } - const waiter = this._pendingQueue.shift() + const pendingItem = this._pendingQueue.shift() if (this._idle.length) { const idleItem = this._idle.pop() clearTimeout(idleItem.timeoutId) const client = idleItem.client - client.release = release.bind(this, client) - this.emit('acquire', client) - return waiter.callback(undefined, client, client.release) + const idleListener = idleItem.idleListener + + return this._acquireClient(client, pendingItem, idleListener, false) } if (!this._isFull()) { - return this.newClient(waiter) + return this.newClient(pendingItem) } throw new Error('unexpected condition') } @@ -249,26 +230,79 @@ class Pool extends EventEmitter { } } else { this.log('new client connected') - client.release = release.bind(this, client) - this.emit('connect', client) - this.emit('acquire', client) - if (!pendingItem.timedOut) { - if (this.options.verify) { - this.options.verify(client, pendingItem.callback) - } else { - pendingItem.callback(undefined, client, client.release) - } - } else { - if (this.options.verify) { - this.options.verify(client, client.release) - } else { - client.release() - } - } + + return this._acquireClient(client, pendingItem, idleListener, true) } }) } + // acquire a client for a pending work item + _acquireClient (client, pendingItem, idleListener, isNew) { + if (isNew) { + this.emit('connect', client) + } + + this.emit('acquire', client) + + let released = false + + client.release = (err) => { + if (released) { + throwOnRelease() + } + + released = true + this._release(client, idleListener, err) + } + + client.removeListener('error', idleListener) + + if (!pendingItem.timedOut) { + if (isNew && this.options.verify) { + this.options.verify(client, (err) => { + if (err) { + client.release(err) + return pendingItem.callback(err, undefined, NOOP) + } + + pendingItem.callback(undefined, client, client.release) + }) + } else { + pendingItem.callback(undefined, client, client.release) + } + } else { + if (isNew && this.options.verify) { + this.options.verify(client, client.release) + } else { + client.release() + } + } + } + + // release a client back to the poll, include an error + // to remove it from the pool + _release (client, idleListener, err) { + client.on('error', idleListener) + + if (err || this.ending) { + this._remove(client) + this._pulseQueue() + return + } + + // idle timeout + let tid + if (this.options.idleTimeoutMillis) { + tid = setTimeout(() => { + this.log('remove idle client') + this._remove(client) + }, this.options.idleTimeoutMillis) + } + + this._idle.push(new IdleItem(client, idleListener, tid)) + this._pulseQueue() + } + query (text, values, cb) { // guard clause against passing a function as the first parameter if (typeof text === 'function') { diff --git a/test/error-handling.js b/test/error-handling.js index e899c350..1e416683 100644 --- a/test/error-handling.js +++ b/test/error-handling.js @@ -43,6 +43,20 @@ describe('pool error handling', function () { expect(() => client.release()).to.throwError() return yield pool.end() })) + + it('should throw each time with callbacks', function (done) { + const pool = new Pool() + + pool.connect(function (err, client, clientDone) { + expect(err).not.to.be.an(Error) + clientDone() + + expect(() => clientDone()).to.throwError() + expect(() => clientDone()).to.throwError() + + pool.end(done) + }) + }) }) describe('calling connect after end', () => { @@ -101,13 +115,56 @@ describe('pool error handling', function () { client.release() yield new Promise((resolve, reject) => { process.nextTick(() => { + let poolError pool.once('error', (err) => { - expect(err.message).to.equal('expected') - expect(pool.idleCount).to.equal(0) - expect(pool.totalCount).to.equal(0) - pool.end().then(resolve, reject) + poolError = err }) + + let clientError + client.once('error', (err) => { + clientError = err + }) + client.emit('error', new Error('expected')) + + expect(clientError.message).to.equal('expected') + expect(poolError.message).to.equal('expected') + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(0) + pool.end().then(resolve, reject) + }) + }) + })) + }) + + describe('error from in-use client', () => { + it('keeps the client in the pool', co.wrap(function * () { + const pool = new Pool() + const client = yield pool.connect() + expect(pool.totalCount).to.equal(1) + expect(pool.waitingCount).to.equal(0) + expect(pool.idleCount).to.equal(0) + + yield new Promise((resolve, reject) => { + process.nextTick(() => { + let poolError + pool.once('error', (err) => { + poolError = err + }) + + let clientError + client.once('error', (err) => { + clientError = err + }) + + client.emit('error', new Error('expected')) + + expect(clientError.message).to.equal('expected') + expect(poolError).not.to.be.ok() + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(1) + client.release() + pool.end().then(resolve, reject) }) }) }))