mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Move from 3 loops (prepareValue, check for buffers, write param types, write param values) to a single loop. This speeds up the insert benchmark by around 100 queries per second. Performance improvement depends on number of parameters being bound.
235 lines
6.3 KiB
JavaScript
235 lines
6.3 KiB
JavaScript
'use strict'
|
|
|
|
const { EventEmitter } = require('events')
|
|
|
|
const Result = require('./result')
|
|
const utils = require('./utils')
|
|
|
|
class Query extends EventEmitter {
|
|
constructor(config, values, callback) {
|
|
super()
|
|
|
|
config = utils.normalizeQueryConfig(config, values, callback)
|
|
|
|
this.text = config.text
|
|
this.values = config.values
|
|
this.rows = config.rows
|
|
this.types = config.types
|
|
this.name = config.name
|
|
this.binary = config.binary
|
|
// use unique portal name each time
|
|
this.portal = config.portal || ''
|
|
this.callback = config.callback
|
|
this._rowMode = config.rowMode
|
|
if (process.domain && config.callback) {
|
|
this.callback = process.domain.bind(config.callback)
|
|
}
|
|
this._result = new Result(this._rowMode, this.types)
|
|
|
|
// potential for multiple results
|
|
this._results = this._result
|
|
this.isPreparedStatement = false
|
|
this._canceledDueToError = false
|
|
this._promise = null
|
|
}
|
|
|
|
requiresPreparation() {
|
|
// named queries must always be prepared
|
|
if (this.name) {
|
|
return true
|
|
}
|
|
// always prepare if there are max number of rows expected per
|
|
// portal execution
|
|
if (this.rows) {
|
|
return true
|
|
}
|
|
// don't prepare empty text queries
|
|
if (!this.text) {
|
|
return false
|
|
}
|
|
// prepare if there are values
|
|
if (!this.values) {
|
|
return false
|
|
}
|
|
return this.values.length > 0
|
|
}
|
|
|
|
_checkForMultirow() {
|
|
// if we already have a result with a command property
|
|
// then we've already executed one query in a multi-statement simple query
|
|
// turn our results into an array of results
|
|
if (this._result.command) {
|
|
if (!Array.isArray(this._results)) {
|
|
this._results = [this._result]
|
|
}
|
|
this._result = new Result(this._rowMode, this.types)
|
|
this._results.push(this._result)
|
|
}
|
|
}
|
|
|
|
// associates row metadata from the supplied
|
|
// message with this query object
|
|
// metadata used when parsing row results
|
|
handleRowDescription(msg) {
|
|
this._checkForMultirow()
|
|
this._result.addFields(msg.fields)
|
|
this._accumulateRows = this.callback || !this.listeners('row').length
|
|
}
|
|
|
|
handleDataRow(msg) {
|
|
let row
|
|
|
|
if (this._canceledDueToError) {
|
|
return
|
|
}
|
|
|
|
try {
|
|
row = this._result.parseRow(msg.fields)
|
|
} catch (err) {
|
|
this._canceledDueToError = err
|
|
return
|
|
}
|
|
|
|
this.emit('row', row, this._result)
|
|
if (this._accumulateRows) {
|
|
this._result.addRow(row)
|
|
}
|
|
}
|
|
|
|
handleCommandComplete(msg, connection) {
|
|
this._checkForMultirow()
|
|
this._result.addCommandComplete(msg)
|
|
// need to sync after each command complete of a prepared statement
|
|
// if we were using a row count which results in multiple calls to _getRows
|
|
if (this.rows) {
|
|
connection.sync()
|
|
}
|
|
}
|
|
|
|
// if a named prepared statement is created with empty query text
|
|
// the backend will send an emptyQuery message but *not* a command complete message
|
|
// since we pipeline sync immediately after execute we don't need to do anything here
|
|
// unless we have rows specified, in which case we did not pipeline the intial sync call
|
|
handleEmptyQuery(connection) {
|
|
if (this.rows) {
|
|
connection.sync()
|
|
}
|
|
}
|
|
|
|
handleError(err, connection) {
|
|
// need to sync after error during a prepared statement
|
|
if (this._canceledDueToError) {
|
|
err = this._canceledDueToError
|
|
this._canceledDueToError = false
|
|
}
|
|
// if callback supplied do not emit error event as uncaught error
|
|
// events will bubble up to node process
|
|
if (this.callback) {
|
|
return this.callback(err)
|
|
}
|
|
this.emit('error', err)
|
|
}
|
|
|
|
handleReadyForQuery(con) {
|
|
if (this._canceledDueToError) {
|
|
return this.handleError(this._canceledDueToError, con)
|
|
}
|
|
if (this.callback) {
|
|
this.callback(null, this._results)
|
|
}
|
|
this.emit('end', this._results)
|
|
}
|
|
|
|
submit(connection) {
|
|
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
|
|
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
|
|
}
|
|
const previous = connection.parsedStatements[this.name]
|
|
if (this.text && previous && this.text !== previous) {
|
|
return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
|
|
}
|
|
if (this.values && !Array.isArray(this.values)) {
|
|
return new Error('Query values must be an array')
|
|
}
|
|
if (this.requiresPreparation()) {
|
|
this.prepare(connection)
|
|
} else {
|
|
connection.query(this.text)
|
|
}
|
|
return null
|
|
}
|
|
|
|
hasBeenParsed(connection) {
|
|
return this.name && connection.parsedStatements[this.name]
|
|
}
|
|
|
|
handlePortalSuspended(connection) {
|
|
this._getRows(connection, this.rows)
|
|
}
|
|
|
|
_getRows(connection, rows) {
|
|
connection.execute({
|
|
portal: this.portal,
|
|
rows: rows,
|
|
})
|
|
// if we're not reading pages of rows send the sync command
|
|
// to indicate the pipeline is finished
|
|
if (!rows) {
|
|
connection.sync()
|
|
} else {
|
|
// otherwise flush the call out to read more rows
|
|
connection.flush()
|
|
}
|
|
}
|
|
|
|
// http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
|
|
prepare(connection) {
|
|
// prepared statements need sync to be called after each command
|
|
// complete or when an error is encountered
|
|
this.isPreparedStatement = true
|
|
|
|
// TODO refactor this poor encapsulation
|
|
if (!this.hasBeenParsed(connection)) {
|
|
connection.parse({
|
|
text: this.text,
|
|
name: this.name,
|
|
types: this.types,
|
|
})
|
|
}
|
|
|
|
// because we're mapping user supplied values to
|
|
// postgres wire protocol compatible values it could
|
|
// throw an exception, so try/catch this section
|
|
try {
|
|
connection.bind({
|
|
portal: this.portal,
|
|
statement: this.name,
|
|
values: this.values,
|
|
binary: this.binary,
|
|
valueMapper: utils.prepareValue,
|
|
})
|
|
} catch (err) {
|
|
this.handleError(err, connection)
|
|
return
|
|
}
|
|
|
|
connection.describe({
|
|
type: 'P',
|
|
name: this.portal || '',
|
|
})
|
|
|
|
this._getRows(connection, this.rows)
|
|
}
|
|
|
|
handleCopyInResponse(connection) {
|
|
connection.sendCopyFail('No source stream defined')
|
|
}
|
|
|
|
// eslint-disable-next-line no-unused-vars
|
|
handleCopyData(msg, connection) {
|
|
// noop
|
|
}
|
|
}
|
|
|
|
module.exports = Query
|