Queued query errors (#1503)

* Add tests for query callbacks after connection-level errors

* Ensure callbacks are executed for all queued queries after connection-level errors

Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors.

This is not very pleasant but should maintain backwards compatibility…?

* Always call `handleError` asynchronously

This doesn’t match the original behaviour of the type errors, but it’s correct.

* Fix return value of `Client.prototype.query` in immediate error cases

* Mark clients with closed connections as unusable consistently

* Add tests for error event when connecting Client

* Ensure the promise and callback versions of Client#connect always have the same behaviour

* Give same error to queued queries as to active query when ending

and do so in the native Client as well.

* Restore original ordering between queued query callbacks and 'end' event
This commit is contained in:
Charmander 2018-10-03 08:37:15 -07:00 committed by Brian C
parent fed6375e0a
commit 3828aa8608
6 changed files with 256 additions and 96 deletions

View File

@ -36,6 +36,7 @@ var Client = function (config) {
this._connecting = false
this._connected = false
this._connectionError = false
this._queryable = true
this.connection = c.connection || new Connection({
stream: c.stream,
@ -52,16 +53,31 @@ var Client = function (config) {
util.inherits(Client, EventEmitter)
Client.prototype.connect = function (callback) {
Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.handleError(err, this.connection)
})
}
if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
}
this.queryQueue.forEach(enqueueError)
this.queryQueue.length = 0
}
Client.prototype._connect = function (callback) {
var self = this
var con = this.connection
if (this._connecting || this._connected) {
const err = new Error('Client has already been connected. You cannot reuse a client.')
if (callback) {
process.nextTick(() => {
callback(err)
return undefined
}
return Promise.reject(err)
})
return
}
this._connecting = true
@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) {
}
const connectedErrorHandler = (err) => {
if (this.activeQuery) {
var activeQuery = self.activeQuery
this.activeQuery = null
return activeQuery.handleError(err, con)
}
this._queryable = false
this._errorAllQueries(err)
this.emit('error', err)
}
const connectedErrorMessageHandler = (msg) => {
const activeQuery = this.activeQuery
if (!activeQuery) {
connectedErrorHandler(msg)
return
}
this.activeQuery = null
activeQuery.handleError(msg, con)
}
con.on('error', connectingErrorHandler)
con.on('errorMessage', connectingErrorHandler)
// hook up query handling events to connection
// after the connection initially becomes ready for queries
@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) {
self._connected = true
self._attachListeners(con)
con.removeListener('error', connectingErrorHandler)
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
con.on('errorMessage', connectedErrorMessageHandler)
// process possible callback argument to Client#connect
if (callback) {
@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) {
})
con.once('end', () => {
if (this.activeQuery) {
var disconnectError = new Error('Connection terminated')
this.activeQuery.handleError(disconnectError, con)
this.activeQuery = null
}
const error = this._ending
? new Error('Connection terminated')
: new Error('Connection terminated unexpectedly')
this._errorAllQueries(error)
if (!this._ending) {
// if the connection is ended without us calling .end()
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.
const error = new Error('Connection terminated unexpectedly')
if (this._connecting && !this._connectionError) {
if (callback) {
callback(error)
} else {
this.emit('error', error)
connectedErrorHandler(error)
}
} else if (!this._connectionError) {
this.emit('error', error)
connectedErrorHandler(error)
}
}
this.emit('end')
process.nextTick(() => {
this.emit('end')
})
})
con.on('notice', function (msg) {
self.emit('notice', msg)
})
}
if (!callback) {
return new global.Promise((resolve, reject) => {
this.once('error', reject)
this.once('connect', () => {
this.removeListener('error', reject)
resolve()
})
})
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}
return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
}
})
})
}
Client.prototype._attachListeners = function (con) {
@ -340,7 +378,15 @@ Client.prototype._pulseQueryQueue = function () {
if (this.activeQuery) {
this.readyForQuery = false
this.hasExecuted = true
this.activeQuery.submit(this.connection)
const queryError = this.activeQuery.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()
})
}
} else if (this.hasExecuted) {
this.activeQuery = null
this.emit('drain')
@ -379,6 +425,20 @@ Client.prototype.query = function (config, values, callback) {
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
}
if (!this._queryable) {
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
})
return result
}
if (this._ending) {
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
})
return result
}
this.queryQueue.push(query)
this._pulseQueryQueue()
return result
@ -386,18 +446,19 @@ Client.prototype.query = function (config, values, callback) {
Client.prototype.end = function (cb) {
this._ending = true
if (this.activeQuery) {
// if we have an active query we need to force a disconnect
// on the socket - otherwise a hung query could block end forever
this.connection.stream.destroy(new Error('Connection terminated by user'))
return cb ? cb() : Promise.resolve()
}
if (cb) {
this.connection.stream.destroy()
} else {
this.connection.end()
}
if (cb) {
this.connection.once('end', cb)
} else {
return new global.Promise((resolve, reject) => {
this.connection.end()
return new Promise((resolve) => {
this.connection.once('end', resolve)
})
}

View File

@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) {
var packet = self._reader.read()
while (packet) {
var msg = self.parseMessage(packet)
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
}
self.emit(msg.name, msg)
self.emit(eventName, msg)
packet = self._reader.read()
}
})

View File

@ -32,8 +32,10 @@ var Client = module.exports = function (config) {
})
this._queryQueue = []
this._connected = false
this._ending = false
this._connecting = false
this._connected = false
this._queryable = true
// keep these on the object for legacy reasons
// for the time being. TODO: deprecate all this jazz
@ -52,50 +54,48 @@ Client.Query = NativeQuery
util.inherits(Client, EventEmitter)
// connect to the backend
// pass an optional callback to be called once connected
// or with an error if there was a connection error
// if no callback is passed and there is a connection error
// the client will emit an error event.
Client.prototype.connect = function (cb) {
var self = this
var onError = function (err) {
if (cb) return cb(err)
return self.emit('error', err)
}
var result
if (!cb) {
var resolveOut, rejectOut
cb = (err) => err ? rejectOut(err) : resolveOut()
result = new global.Promise(function (resolve, reject) {
resolveOut = resolve
rejectOut = reject
Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.native = this.native
query.handleError(err)
})
}
if (this._hasActiveQuery()) {
enqueueError(this._activeQuery)
this._activeQuery = null
}
this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0
}
// connect to the backend
// pass an optional callback to be called once connected
// or with an error if there was a connection error
Client.prototype._connect = function (cb) {
var self = this
if (this._connecting) {
process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
return result
return
}
this._connecting = true
this.connectionParameters.getLibpqConnectionString(function (err, conString) {
if (err) return onError(err)
if (err) return cb(err)
self.native.connect(conString, function (err) {
if (err) return onError(err)
if (err) return cb(err)
// set internal states to connected
self._connected = true
// handle connection errors from the native layer
self.native.on('error', function (err) {
// error will be handled by active query
if (self._activeQuery && self._activeQuery.state !== 'end') {
return
}
self._queryable = false
self._errorAllQueries(err)
self.emit('error', err)
})
@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) {
self.emit('connect')
self._pulseQueryQueue(true)
// possibly call the optional callback
if (cb) cb()
cb()
})
})
}
return result
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}
return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
}
})
})
}
// send a query to the server
@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) {
// optional string rowMode = 'array' for an array of results
// }
Client.prototype.query = function (config, values, callback) {
var query
var result
if (typeof config.submit === 'function') {
result = query = config
// accept query(new Query(...), (err, res) => { }) style
if (typeof values === 'function') {
config.callback = values
}
this._queryQueue.push(config)
this._pulseQueryQueue()
return config
} else {
query = new NativeQuery(config, values, callback)
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
}
}
var query = new NativeQuery(config, values, callback)
var result
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
if (!this._queryable) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'))
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
return result
}
if (this._ending) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'))
})
return result
}
this._queryQueue.push(query)
this._pulseQueryQueue()
return result
@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) {
// disconnect from the backend server
Client.prototype.end = function (cb) {
var self = this
this._ending = true
if (!this._connected) {
this.once('connect', this.end.bind(this, cb))
}
@ -170,14 +204,12 @@ Client.prototype.end = function (cb) {
})
}
this.native.end(function () {
// send an error to the active query
if (self._hasActiveQuery()) {
var msg = 'Connection terminated'
self._queryQueue.length = 0
self._activeQuery.handleError(new Error(msg))
}
self.emit('end')
if (cb) cb()
self._errorAllQueries(new Error('Connection terminated'))
process.nextTick(() => {
self.emit('end')
if (cb) cb()
})
})
return result
}

View File

@ -146,22 +146,17 @@ Query.prototype.handleError = function (err, connection) {
Query.prototype.submit = function (connection) {
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
const err = new Error('A query must have either text or a name. Supplying neither is unsupported.')
connection.emit('error', err)
connection.emit('readyForQuery')
return
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
}
if (this.values && !Array.isArray(this.values)) {
const err = new Error('Query values must be an array')
connection.emit('error', err)
connection.emit('readyForQuery')
return
return new Error('Query values must be an array')
}
if (this.requiresPreparation()) {
this.prepare(connection)
} else {
connection.query(this.text)
}
return null
}
Query.prototype.hasBeenParsed = function (connection) {

View File

@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => {
})
})
suite.test('using a client after closing it results in error', (done) => {
const client = new Client()
client.connect(() => {
client.end(assert.calls(() => {
client.query('SELECT 1', assert.calls((err) => {
assert.equal(err.message, 'Client was closed and is not queryable')
done()
}))
}))
})
})
suite.test('query receives error on client shutdown', function (done) {
var client = new Client()
client.connect(assert.success(function () {
@ -139,6 +151,9 @@ suite.test('when connecting to an invalid host with callback', function (done) {
var client = new Client({
user: 'very invalid username'
})
client.on('error', () => {
assert.fail('unexpected error event when connecting')
})
client.connect(function (error, client) {
assert(error instanceof Error)
done()
@ -149,6 +164,9 @@ suite.test('when connecting to invalid host with promise', function (done) {
var client = new Client({
user: 'very invalid username'
})
client.on('error', () => {
assert.fail('unexpected error event when connecting')
})
client.connect().catch((e) => done())
})

View File

@ -2,9 +2,6 @@
var helper = require('./test-helper')
const pg = helper.pg
// make pool hold 2 clients
const pool = new pg.Pool({ max: 2 })
const suite = new helper.Suite()
suite.test('connecting to invalid port', (cb) => {
const pool = new pg.Pool({ port: 13801 })
@ -12,6 +9,8 @@ suite.test('connecting to invalid port', (cb) => {
})
suite.test('errors emitted on pool', (cb) => {
// make pool hold 2 clients
const pool = new pg.Pool({ max: 2 })
// get first client
pool.connect(assert.success(function (client, done) {
client.id = 1
@ -46,3 +45,57 @@ suite.test('errors emitted on pool', (cb) => {
})
}))
})
suite.test('connection-level errors cause queued queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))
pool.once('error', assert.calls((err, brokenClient) => {
assert.equal(client, brokenClient)
}))
client.query('SELECT 1', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.message, 'Connection terminated unexpectedly')
}
done()
pool.end()
cb()
}))
}))
})
suite.test('connection-level errors cause future queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))
pool.once('error', assert.calls((err, brokenClient) => {
assert.equal(client, brokenClient)
client.query('SELECT 1', assert.calls((err) => {
assert.equal(err.message, 'Client has encountered a connection error and is not queryable')
done()
pool.end()
cb()
}))
}))
}))
})