mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
* Add tests for query callbacks after connection-level errors * Ensure callbacks are executed for all queued queries after connection-level errors Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors. This is not very pleasant but should maintain backwards compatibility…? * Always call `handleError` asynchronously This doesn’t match the original behaviour of the type errors, but it’s correct. * Fix return value of `Client.prototype.query` in immediate error cases * Mark clients with closed connections as unusable consistently * Add tests for error event when connecting Client * Ensure the promise and callback versions of Client#connect always have the same behaviour * Give same error to queued queries as to active query when ending and do so in the native Client as well. * Restore original ordering between queued query callbacks and 'end' event
220 lines
5.9 KiB
JavaScript
220 lines
5.9 KiB
JavaScript
'use strict'
|
|
/**
|
|
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
|
|
* All rights reserved.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* README.md file in the root directory of this source tree.
|
|
*/
|
|
|
|
var EventEmitter = require('events').EventEmitter
|
|
var util = require('util')
|
|
|
|
var Result = require('./result')
|
|
var utils = require('./utils')
|
|
|
|
var Query = function (config, values, callback) {
|
|
// use of "new" optional
|
|
if (!(this instanceof Query)) { return new Query(config, values, callback) }
|
|
|
|
config = utils.normalizeQueryConfig(config, values, callback)
|
|
|
|
this.text = config.text
|
|
this.values = config.values
|
|
this.rows = config.rows
|
|
this.types = config.types
|
|
this.name = config.name
|
|
this.binary = config.binary
|
|
// use unique portal name each time
|
|
this.portal = config.portal || ''
|
|
this.callback = config.callback
|
|
this._rowMode = config.rowMode
|
|
if (process.domain && config.callback) {
|
|
this.callback = process.domain.bind(config.callback)
|
|
}
|
|
this._result = new Result(this._rowMode, this.types)
|
|
|
|
// potential for multiple results
|
|
this._results = this._result
|
|
this.isPreparedStatement = false
|
|
this._canceledDueToError = false
|
|
this._promise = null
|
|
EventEmitter.call(this)
|
|
}
|
|
|
|
util.inherits(Query, EventEmitter)
|
|
|
|
Query.prototype.requiresPreparation = function () {
|
|
// named queries must always be prepared
|
|
if (this.name) { return true }
|
|
// always prepare if there are max number of rows expected per
|
|
// portal execution
|
|
if (this.rows) { return true }
|
|
// don't prepare empty text queries
|
|
if (!this.text) { return false }
|
|
// prepare if there are values
|
|
if (!this.values) { return false }
|
|
return this.values.length > 0
|
|
}
|
|
|
|
Query.prototype._checkForMultirow = function () {
|
|
// if we already have a result with a command property
|
|
// then we've already executed one query in a multi-statement simple query
|
|
// turn our results into an array of results
|
|
if (this._result.command) {
|
|
if (!Array.isArray(this._results)) {
|
|
this._results = [this._result]
|
|
}
|
|
this._result = new Result(this._rowMode, this.types)
|
|
this._results.push(this._result)
|
|
}
|
|
}
|
|
|
|
// associates row metadata from the supplied
|
|
// message with this query object
|
|
// metadata used when parsing row results
|
|
Query.prototype.handleRowDescription = function (msg) {
|
|
this._checkForMultirow()
|
|
this._result.addFields(msg.fields)
|
|
this._accumulateRows = this.callback || !this.listeners('row').length
|
|
}
|
|
|
|
Query.prototype.handleDataRow = function (msg) {
|
|
var row
|
|
|
|
if (this._canceledDueToError) {
|
|
return
|
|
}
|
|
|
|
try {
|
|
row = this._result.parseRow(msg.fields)
|
|
} catch (err) {
|
|
this._canceledDueToError = err
|
|
return
|
|
}
|
|
|
|
this.emit('row', row, this._result)
|
|
if (this._accumulateRows) {
|
|
this._result.addRow(row)
|
|
}
|
|
}
|
|
|
|
Query.prototype.handleCommandComplete = function (msg, con) {
|
|
this._checkForMultirow()
|
|
this._result.addCommandComplete(msg)
|
|
// need to sync after each command complete of a prepared statement
|
|
if (this.isPreparedStatement) {
|
|
con.sync()
|
|
}
|
|
}
|
|
|
|
// if a named prepared statement is created with empty query text
|
|
// the backend will send an emptyQuery message but *not* a command complete message
|
|
// execution on the connection will hang until the backend receives a sync message
|
|
Query.prototype.handleEmptyQuery = function (con) {
|
|
if (this.isPreparedStatement) {
|
|
con.sync()
|
|
}
|
|
}
|
|
|
|
Query.prototype.handleReadyForQuery = function (con) {
|
|
if (this._canceledDueToError) {
|
|
return this.handleError(this._canceledDueToError, con)
|
|
}
|
|
if (this.callback) {
|
|
this.callback(null, this._results)
|
|
}
|
|
this.emit('end', this._results)
|
|
}
|
|
|
|
Query.prototype.handleError = function (err, connection) {
|
|
// need to sync after error during a prepared statement
|
|
if (this.isPreparedStatement) {
|
|
connection.sync()
|
|
}
|
|
if (this._canceledDueToError) {
|
|
err = this._canceledDueToError
|
|
this._canceledDueToError = false
|
|
}
|
|
// if callback supplied do not emit error event as uncaught error
|
|
// events will bubble up to node process
|
|
if (this.callback) {
|
|
return this.callback(err)
|
|
}
|
|
this.emit('error', err)
|
|
}
|
|
|
|
Query.prototype.submit = function (connection) {
|
|
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
|
|
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
|
|
}
|
|
if (this.values && !Array.isArray(this.values)) {
|
|
return new Error('Query values must be an array')
|
|
}
|
|
if (this.requiresPreparation()) {
|
|
this.prepare(connection)
|
|
} else {
|
|
connection.query(this.text)
|
|
}
|
|
return null
|
|
}
|
|
|
|
Query.prototype.hasBeenParsed = function (connection) {
|
|
return this.name && connection.parsedStatements[this.name]
|
|
}
|
|
|
|
Query.prototype.handlePortalSuspended = function (connection) {
|
|
this._getRows(connection, this.rows)
|
|
}
|
|
|
|
Query.prototype._getRows = function (connection, rows) {
|
|
connection.execute({
|
|
portal: this.portal,
|
|
rows: rows
|
|
}, true)
|
|
connection.flush()
|
|
}
|
|
|
|
Query.prototype.prepare = function (connection) {
|
|
var self = this
|
|
// prepared statements need sync to be called after each command
|
|
// complete or when an error is encountered
|
|
this.isPreparedStatement = true
|
|
// TODO refactor this poor encapsulation
|
|
if (!this.hasBeenParsed(connection)) {
|
|
connection.parse({
|
|
text: self.text,
|
|
name: self.name,
|
|
types: self.types
|
|
}, true)
|
|
}
|
|
|
|
if (self.values) {
|
|
self.values = self.values.map(utils.prepareValue)
|
|
}
|
|
|
|
// http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
|
|
connection.bind({
|
|
portal: self.portal,
|
|
statement: self.name,
|
|
values: self.values,
|
|
binary: self.binary
|
|
}, true)
|
|
|
|
connection.describe({
|
|
type: 'P',
|
|
name: self.portal || ''
|
|
}, true)
|
|
|
|
this._getRows(connection, this.rows)
|
|
}
|
|
|
|
Query.prototype.handleCopyInResponse = function (connection) {
|
|
connection.sendCopyFail('No source stream defined')
|
|
}
|
|
|
|
Query.prototype.handleCopyData = function (msg, connection) {
|
|
// noop
|
|
}
|
|
module.exports = Query
|