mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
* Add tests for query callbacks after connection-level errors * Ensure callbacks are executed for all queued queries after connection-level errors Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors. This is not very pleasant but should maintain backwards compatibility…? * Always call `handleError` asynchronously This doesn’t match the original behaviour of the type errors, but it’s correct. * Fix return value of `Client.prototype.query` in immediate error cases * Mark clients with closed connections as unusable consistently * Add tests for error event when connecting Client * Ensure the promise and callback versions of Client#connect always have the same behaviour * Give same error to queued queries as to active query when ending and do so in the native Client as well. * Restore original ordering between queued query callbacks and 'end' event
259 lines
6.4 KiB
JavaScript
259 lines
6.4 KiB
JavaScript
'use strict'
|
|
/**
|
|
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
|
|
* All rights reserved.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* README.md file in the root directory of this source tree.
|
|
*/
|
|
|
|
var Native = require('pg-native')
|
|
var TypeOverrides = require('../type-overrides')
|
|
var semver = require('semver')
|
|
var pkg = require('../../package.json')
|
|
var assert = require('assert')
|
|
var EventEmitter = require('events').EventEmitter
|
|
var util = require('util')
|
|
var ConnectionParameters = require('../connection-parameters')
|
|
|
|
var msg = 'Version >= ' + pkg.minNativeVersion + ' of pg-native required.'
|
|
assert(semver.gte(Native.version, pkg.minNativeVersion), msg)
|
|
|
|
var NativeQuery = require('./query')
|
|
|
|
var Client = module.exports = function (config) {
|
|
EventEmitter.call(this)
|
|
config = config || {}
|
|
|
|
this._types = new TypeOverrides(config.types)
|
|
|
|
this.native = new Native({
|
|
types: this._types
|
|
})
|
|
|
|
this._queryQueue = []
|
|
this._ending = false
|
|
this._connecting = false
|
|
this._connected = false
|
|
this._queryable = true
|
|
|
|
// keep these on the object for legacy reasons
|
|
// for the time being. TODO: deprecate all this jazz
|
|
var cp = this.connectionParameters = new ConnectionParameters(config)
|
|
this.user = cp.user
|
|
this.password = cp.password
|
|
this.database = cp.database
|
|
this.host = cp.host
|
|
this.port = cp.port
|
|
|
|
// a hash to hold named queries
|
|
this.namedQueries = {}
|
|
}
|
|
|
|
Client.Query = NativeQuery
|
|
|
|
util.inherits(Client, EventEmitter)
|
|
|
|
Client.prototype._errorAllQueries = function (err) {
|
|
const enqueueError = (query) => {
|
|
process.nextTick(() => {
|
|
query.native = this.native
|
|
query.handleError(err)
|
|
})
|
|
}
|
|
|
|
if (this._hasActiveQuery()) {
|
|
enqueueError(this._activeQuery)
|
|
this._activeQuery = null
|
|
}
|
|
|
|
this._queryQueue.forEach(enqueueError)
|
|
this._queryQueue.length = 0
|
|
}
|
|
|
|
// connect to the backend
|
|
// pass an optional callback to be called once connected
|
|
// or with an error if there was a connection error
|
|
Client.prototype._connect = function (cb) {
|
|
var self = this
|
|
|
|
if (this._connecting) {
|
|
process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
|
|
return
|
|
}
|
|
|
|
this._connecting = true
|
|
|
|
this.connectionParameters.getLibpqConnectionString(function (err, conString) {
|
|
if (err) return cb(err)
|
|
self.native.connect(conString, function (err) {
|
|
if (err) return cb(err)
|
|
|
|
// set internal states to connected
|
|
self._connected = true
|
|
|
|
// handle connection errors from the native layer
|
|
self.native.on('error', function (err) {
|
|
self._queryable = false
|
|
self._errorAllQueries(err)
|
|
self.emit('error', err)
|
|
})
|
|
|
|
self.native.on('notification', function (msg) {
|
|
self.emit('notification', {
|
|
channel: msg.relname,
|
|
payload: msg.extra
|
|
})
|
|
})
|
|
|
|
// signal we are connected now
|
|
self.emit('connect')
|
|
self._pulseQueryQueue(true)
|
|
|
|
cb()
|
|
})
|
|
})
|
|
}
|
|
|
|
Client.prototype.connect = function (callback) {
|
|
if (callback) {
|
|
this._connect(callback)
|
|
return
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this._connect((error) => {
|
|
if (error) {
|
|
reject(error)
|
|
} else {
|
|
resolve()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
// send a query to the server
|
|
// this method is highly overloaded to take
|
|
// 1) string query, optional array of parameters, optional function callback
|
|
// 2) object query with {
|
|
// string query
|
|
// optional array values,
|
|
// optional function callback instead of as a separate parameter
|
|
// optional string name to name & cache the query plan
|
|
// optional string rowMode = 'array' for an array of results
|
|
// }
|
|
Client.prototype.query = function (config, values, callback) {
|
|
var query
|
|
var result
|
|
|
|
if (typeof config.submit === 'function') {
|
|
result = query = config
|
|
// accept query(new Query(...), (err, res) => { }) style
|
|
if (typeof values === 'function') {
|
|
config.callback = values
|
|
}
|
|
} else {
|
|
query = new NativeQuery(config, values, callback)
|
|
if (!query.callback) {
|
|
let resolveOut, rejectOut
|
|
result = new Promise((resolve, reject) => {
|
|
resolveOut = resolve
|
|
rejectOut = reject
|
|
})
|
|
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
|
|
}
|
|
}
|
|
|
|
if (!this._queryable) {
|
|
query.native = this.native
|
|
process.nextTick(() => {
|
|
query.handleError(new Error('Client has encountered a connection error and is not queryable'))
|
|
})
|
|
return result
|
|
}
|
|
|
|
if (this._ending) {
|
|
query.native = this.native
|
|
process.nextTick(() => {
|
|
query.handleError(new Error('Client was closed and is not queryable'))
|
|
})
|
|
return result
|
|
}
|
|
|
|
this._queryQueue.push(query)
|
|
this._pulseQueryQueue()
|
|
return result
|
|
}
|
|
|
|
// disconnect from the backend server
|
|
Client.prototype.end = function (cb) {
|
|
var self = this
|
|
|
|
this._ending = true
|
|
|
|
if (!this._connected) {
|
|
this.once('connect', this.end.bind(this, cb))
|
|
}
|
|
var result
|
|
if (!cb) {
|
|
var resolve, reject
|
|
cb = (err) => err ? reject(err) : resolve()
|
|
result = new global.Promise(function (res, rej) {
|
|
resolve = res
|
|
reject = rej
|
|
})
|
|
}
|
|
this.native.end(function () {
|
|
self._errorAllQueries(new Error('Connection terminated'))
|
|
|
|
process.nextTick(() => {
|
|
self.emit('end')
|
|
if (cb) cb()
|
|
})
|
|
})
|
|
return result
|
|
}
|
|
|
|
Client.prototype._hasActiveQuery = function () {
|
|
return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
|
|
}
|
|
|
|
Client.prototype._pulseQueryQueue = function (initialConnection) {
|
|
if (!this._connected) {
|
|
return
|
|
}
|
|
if (this._hasActiveQuery()) {
|
|
return
|
|
}
|
|
var query = this._queryQueue.shift()
|
|
if (!query) {
|
|
if (!initialConnection) {
|
|
this.emit('drain')
|
|
}
|
|
return
|
|
}
|
|
this._activeQuery = query
|
|
query.submit(this)
|
|
var self = this
|
|
query.once('_done', function () {
|
|
self._pulseQueryQueue()
|
|
})
|
|
}
|
|
|
|
// attempt to cancel an in-progress query
|
|
Client.prototype.cancel = function (query) {
|
|
if (this._activeQuery === query) {
|
|
this.native.cancel(function () {})
|
|
} else if (this._queryQueue.indexOf(query) !== -1) {
|
|
this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
|
|
}
|
|
}
|
|
|
|
Client.prototype.setTypeParser = function (oid, format, parseFn) {
|
|
return this._types.setTypeParser(oid, format, parseFn)
|
|
}
|
|
|
|
Client.prototype.getTypeParser = function (oid, format) {
|
|
return this._types.getTypeParser(oid, format)
|
|
}
|