node-postgres/lib/query.js
Charmander 3828aa8608 Queued query errors (#1503)
* Add tests for query callbacks after connection-level errors

* Ensure callbacks are executed for all queued queries after connection-level errors

Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors.

This is not very pleasant but should maintain backwards compatibility…?

* Always call `handleError` asynchronously

This doesn’t match the original behaviour of the type errors, but it’s correct.

* Fix return value of `Client.prototype.query` in immediate error cases

* Mark clients with closed connections as unusable consistently

* Add tests for error event when connecting Client

* Ensure the promise and callback versions of Client#connect always have the same behaviour

* Give same error to queued queries as to active query when ending

and do so in the native Client as well.

* Restore original ordering between queued query callbacks and 'end' event
2018-10-03 10:37:15 -05:00

220 lines
5.9 KiB
JavaScript

'use strict'
/**
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
* All rights reserved.
*
* This source code is licensed under the MIT license found in the
* README.md file in the root directory of this source tree.
*/
var EventEmitter = require('events').EventEmitter
var util = require('util')
var Result = require('./result')
var utils = require('./utils')
var Query = function (config, values, callback) {
// use of "new" optional
if (!(this instanceof Query)) { return new Query(config, values, callback) }
config = utils.normalizeQueryConfig(config, values, callback)
this.text = config.text
this.values = config.values
this.rows = config.rows
this.types = config.types
this.name = config.name
this.binary = config.binary
// use unique portal name each time
this.portal = config.portal || ''
this.callback = config.callback
this._rowMode = config.rowMode
if (process.domain && config.callback) {
this.callback = process.domain.bind(config.callback)
}
this._result = new Result(this._rowMode, this.types)
// potential for multiple results
this._results = this._result
this.isPreparedStatement = false
this._canceledDueToError = false
this._promise = null
EventEmitter.call(this)
}
util.inherits(Query, EventEmitter)
Query.prototype.requiresPreparation = function () {
// named queries must always be prepared
if (this.name) { return true }
// always prepare if there are max number of rows expected per
// portal execution
if (this.rows) { return true }
// don't prepare empty text queries
if (!this.text) { return false }
// prepare if there are values
if (!this.values) { return false }
return this.values.length > 0
}
Query.prototype._checkForMultirow = function () {
// if we already have a result with a command property
// then we've already executed one query in a multi-statement simple query
// turn our results into an array of results
if (this._result.command) {
if (!Array.isArray(this._results)) {
this._results = [this._result]
}
this._result = new Result(this._rowMode, this.types)
this._results.push(this._result)
}
}
// associates row metadata from the supplied
// message with this query object
// metadata used when parsing row results
Query.prototype.handleRowDescription = function (msg) {
this._checkForMultirow()
this._result.addFields(msg.fields)
this._accumulateRows = this.callback || !this.listeners('row').length
}
Query.prototype.handleDataRow = function (msg) {
var row
if (this._canceledDueToError) {
return
}
try {
row = this._result.parseRow(msg.fields)
} catch (err) {
this._canceledDueToError = err
return
}
this.emit('row', row, this._result)
if (this._accumulateRows) {
this._result.addRow(row)
}
}
Query.prototype.handleCommandComplete = function (msg, con) {
this._checkForMultirow()
this._result.addCommandComplete(msg)
// need to sync after each command complete of a prepared statement
if (this.isPreparedStatement) {
con.sync()
}
}
// if a named prepared statement is created with empty query text
// the backend will send an emptyQuery message but *not* a command complete message
// execution on the connection will hang until the backend receives a sync message
Query.prototype.handleEmptyQuery = function (con) {
if (this.isPreparedStatement) {
con.sync()
}
}
Query.prototype.handleReadyForQuery = function (con) {
if (this._canceledDueToError) {
return this.handleError(this._canceledDueToError, con)
}
if (this.callback) {
this.callback(null, this._results)
}
this.emit('end', this._results)
}
Query.prototype.handleError = function (err, connection) {
// need to sync after error during a prepared statement
if (this.isPreparedStatement) {
connection.sync()
}
if (this._canceledDueToError) {
err = this._canceledDueToError
this._canceledDueToError = false
}
// if callback supplied do not emit error event as uncaught error
// events will bubble up to node process
if (this.callback) {
return this.callback(err)
}
this.emit('error', err)
}
Query.prototype.submit = function (connection) {
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
}
if (this.values && !Array.isArray(this.values)) {
return new Error('Query values must be an array')
}
if (this.requiresPreparation()) {
this.prepare(connection)
} else {
connection.query(this.text)
}
return null
}
Query.prototype.hasBeenParsed = function (connection) {
return this.name && connection.parsedStatements[this.name]
}
Query.prototype.handlePortalSuspended = function (connection) {
this._getRows(connection, this.rows)
}
Query.prototype._getRows = function (connection, rows) {
connection.execute({
portal: this.portal,
rows: rows
}, true)
connection.flush()
}
Query.prototype.prepare = function (connection) {
var self = this
// prepared statements need sync to be called after each command
// complete or when an error is encountered
this.isPreparedStatement = true
// TODO refactor this poor encapsulation
if (!this.hasBeenParsed(connection)) {
connection.parse({
text: self.text,
name: self.name,
types: self.types
}, true)
}
if (self.values) {
self.values = self.values.map(utils.prepareValue)
}
// http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
connection.bind({
portal: self.portal,
statement: self.name,
values: self.values,
binary: self.binary
}, true)
connection.describe({
type: 'P',
name: self.portal || ''
}, true)
this._getRows(connection, this.rows)
}
Query.prototype.handleCopyInResponse = function (connection) {
connection.sendCopyFail('No source stream defined')
}
Query.prototype.handleCopyData = function (msg, connection) {
// noop
}
module.exports = Query