From 04e5297d2ea5b45b32e01edaff97a7bd29ba6229 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 10:33:33 -0500 Subject: [PATCH] Convert more things to ES6 classes --- packages/pg/lib/client.js | 1029 +++++++++++----------- packages/pg/lib/connection-parameters.js | 202 ++--- packages/pg/lib/result.js | 225 +++-- 3 files changed, 768 insertions(+), 688 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 93dfc6c9..fd9ecad1 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -19,388 +19,89 @@ var Query = require('./query') var defaults = require('./defaults') var Connection = require('./connection') -var Client = function (config) { - EventEmitter.call(this) +class Client extends EventEmitter { + constructor(config) { + super() - this.connectionParameters = new ConnectionParameters(config) - this.user = this.connectionParameters.user - this.database = this.connectionParameters.database - this.port = this.connectionParameters.port - this.host = this.connectionParameters.host + this.connectionParameters = new ConnectionParameters(config) + this.user = this.connectionParameters.user + this.database = this.connectionParameters.database + this.port = this.connectionParameters.port + this.host = this.connectionParameters.host - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: this.connectionParameters.password, - }) - - this.replication = this.connectionParameters.replication - - var c = config || {} - - this._Promise = c.Promise || global.Promise - this._types = new TypeOverrides(c.types) - this._ending = false - this._connecting = false - this._connected = false - this._connectionError = false - this._queryable = true - - this.connection = - c.connection || - new Connection({ - stream: c.stream, - ssl: this.connectionParameters.ssl, - keepAlive: c.keepAlive || false, - keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, - encoding: this.connectionParameters.client_encoding || 'utf8', + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: this.connectionParameters.password, }) - this.queryQueue = [] - this.binary = c.binary || defaults.binary - this.processID = null - this.secretKey = null - this.ssl = this.connectionParameters.ssl || false - this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 -} -util.inherits(Client, EventEmitter) + this.replication = this.connectionParameters.replication -Client.prototype._errorAllQueries = function (err) { - const enqueueError = (query) => { - process.nextTick(() => { - query.handleError(err, this.connection) - }) + var c = config || {} + + this._Promise = c.Promise || global.Promise + this._types = new TypeOverrides(c.types) + this._ending = false + this._connecting = false + this._connected = false + this._connectionError = false + this._queryable = true + + this.connection = + c.connection || + new Connection({ + stream: c.stream, + ssl: this.connectionParameters.ssl, + keepAlive: c.keepAlive || false, + keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, + encoding: this.connectionParameters.client_encoding || 'utf8', + }) + this.queryQueue = [] + this.binary = c.binary || defaults.binary + this.processID = null + this.secretKey = null + this.ssl = this.connectionParameters.ssl || false + this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 } - if (this.activeQuery) { - enqueueError(this.activeQuery) - this.activeQuery = null - } - - this.queryQueue.forEach(enqueueError) - this.queryQueue.length = 0 -} - -Client.prototype._connect = function (callback) { - var self = this - var con = this.connection - if (this._connecting || this._connected) { - const err = new Error('Client has already been connected. You cannot reuse a client.') - process.nextTick(() => { - callback(err) - }) - return - } - this._connecting = true - - var connectionTimeoutHandle - if (this._connectionTimeoutMillis > 0) { - connectionTimeoutHandle = setTimeout(() => { - con._ending = true - con.stream.destroy(new Error('timeout expired')) - }, this._connectionTimeoutMillis) - } - - if (this.host && this.host.indexOf('/') === 0) { - con.connect(this.host + '/.s.PGSQL.' + this.port) - } else { - con.connect(this.port, this.host) - } - - // once connection is established send startup message - con.on('connect', function () { - if (self.ssl) { - con.requestSsl() - } else { - con.startup(self.getStartupConf()) - } - }) - - con.on('sslconnect', function () { - con.startup(self.getStartupConf()) - }) - - function checkPgPass(cb) { - return function (msg) { - if (typeof self.password === 'function') { - self._Promise - .resolve() - .then(() => self.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return - } - self.connectionParameters.password = self.password = pass - } else { - self.connectionParameters.password = self.password = null - } - cb(msg) - }) - .catch((err) => { - con.emit('error', err) - }) - } else if (self.password !== null) { - cb(msg) - } else { - pgPass(self.connectionParameters, function (pass) { - if (undefined !== pass) { - self.connectionParameters.password = self.password = pass - } - cb(msg) - }) - } - } - } - - // password request handling - con.on( - 'authenticationCleartextPassword', - checkPgPass(function () { - con.password(self.password) - }) - ) - - // password request handling - con.on( - 'authenticationMD5Password', - checkPgPass(function (msg) { - con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) - }) - ) - - // password request handling (SASL) - var saslSession - con.on( - 'authenticationSASL', - checkPgPass(function (msg) { - saslSession = sasl.startSession(msg.mechanisms) - - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) - }) - ) - - // password request handling (SASL) - con.on('authenticationSASLContinue', function (msg) { - sasl.continueSession(saslSession, self.password, msg.data) - - con.sendSCRAMClientFinalMessage(saslSession.response) - }) - - // password request handling (SASL) - con.on('authenticationSASLFinal', function (msg) { - sasl.finalizeSession(saslSession, msg.data) - - saslSession = null - }) - - con.once('backendKeyData', function (msg) { - self.processID = msg.processID - self.secretKey = msg.secretKey - }) - - const connectingErrorHandler = (err) => { - if (this._connectionError) { - return - } - this._connectionError = true - clearTimeout(connectionTimeoutHandle) - if (callback) { - return callback(err) - } - this.emit('error', err) - } - - const connectedErrorHandler = (err) => { - this._queryable = false - this._errorAllQueries(err) - this.emit('error', err) - } - - const connectedErrorMessageHandler = (msg) => { - const activeQuery = this.activeQuery - - if (!activeQuery) { - connectedErrorHandler(msg) - return + _errorAllQueries(err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) } - this.activeQuery = null - activeQuery.handleError(msg, con) - } - - con.on('error', connectingErrorHandler) - con.on('errorMessage', connectingErrorHandler) - - // hook up query handling events to connection - // after the connection initially becomes ready for queries - con.once('readyForQuery', function () { - self._connecting = false - self._connected = true - self._attachListeners(con) - con.removeListener('error', connectingErrorHandler) - con.removeListener('errorMessage', connectingErrorHandler) - con.on('error', connectedErrorHandler) - con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(connectionTimeoutHandle) - - // process possible callback argument to Client#connect - if (callback) { - callback(null, self) - // remove callback for proper error handling - // after the connect event - callback = null - } - self.emit('connect') - }) - - con.on('readyForQuery', function () { - var activeQuery = self.activeQuery - self.activeQuery = null - self.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(con) - } - self._pulseQueryQueue() - }) - - con.once('end', () => { - const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') - - clearTimeout(connectionTimeoutHandle) - this._errorAllQueries(error) - - if (!this._ending) { - // if the connection is ended without us calling .end() - // on this client then we have an unexpected disconnection - // treat this as an error unless we've already emitted an error - // during connection. - if (this._connecting && !this._connectionError) { - if (callback) { - callback(error) - } else { - connectedErrorHandler(error) - } - } else if (!this._connectionError) { - connectedErrorHandler(error) - } + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null } - process.nextTick(() => { - this.emit('end') - }) - }) - - con.on('notice', function (msg) { - self.emit('notice', msg) - }) -} - -Client.prototype.connect = function (callback) { - if (callback) { - this._connect(callback) - return + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 } - return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve() - } - }) - }) -} - -Client.prototype._attachListeners = function (con) { - const self = this - // delegate rowDescription to active query - con.on('rowDescription', function (msg) { - self.activeQuery.handleRowDescription(msg) - }) - - // delegate dataRow to active query - con.on('dataRow', function (msg) { - self.activeQuery.handleDataRow(msg) - }) - - // delegate portalSuspended to active query - // eslint-disable-next-line no-unused-vars - con.on('portalSuspended', function (msg) { - self.activeQuery.handlePortalSuspended(con) - }) - - // delegate emptyQuery to active query - // eslint-disable-next-line no-unused-vars - con.on('emptyQuery', function (msg) { - self.activeQuery.handleEmptyQuery(con) - }) - - // delegate commandComplete to active query - con.on('commandComplete', function (msg) { - self.activeQuery.handleCommandComplete(msg, con) - }) - - // if a prepared statement has a name and properly parses - // we track that its already been executed so we don't parse - // it again on the same client - // eslint-disable-next-line no-unused-vars - con.on('parseComplete', function (msg) { - if (self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = self.activeQuery.text - } - }) - - // eslint-disable-next-line no-unused-vars - con.on('copyInResponse', function (msg) { - self.activeQuery.handleCopyInResponse(self.connection) - }) - - con.on('copyData', function (msg) { - self.activeQuery.handleCopyData(msg, self.connection) - }) - - con.on('notification', function (msg) { - self.emit('notification', msg) - }) -} - -Client.prototype.getStartupConf = function () { - var params = this.connectionParameters - - var data = { - user: params.user, - database: params.database, - } - - var appName = params.application_name || params.fallback_application_name - if (appName) { - data.application_name = appName - } - if (params.replication) { - data.replication = '' + params.replication - } - if (params.statement_timeout) { - data.statement_timeout = String(parseInt(params.statement_timeout, 10)) - } - if (params.idle_in_transaction_session_timeout) { - data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) - } - if (params.options) { - data.options = params.options - } - - return data -} - -Client.prototype.cancel = function (client, query) { - if (client.activeQuery === query) { + _connect(callback) { + var self = this var con = this.connection + if (this._connecting || this._connected) { + const err = new Error('Client has already been connected. You cannot reuse a client.') + process.nextTick(() => { + callback(err) + }) + return + } + this._connecting = true + + var connectionTimeoutHandle + if (this._connectionTimeoutMillis > 0) { + connectionTimeoutHandle = setTimeout(() => { + con._ending = true + con.stream.destroy(new Error('timeout expired')) + }, this._connectionTimeoutMillis) + } if (this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) @@ -408,186 +109,488 @@ Client.prototype.cancel = function (client, query) { con.connect(this.port, this.host) } - // once connection is established send cancel message + // once connection is established send startup message con.on('connect', function () { - con.cancel(client.processID, client.secretKey) - }) - } else if (client.queryQueue.indexOf(query) !== -1) { - client.queryQueue.splice(client.queryQueue.indexOf(query), 1) - } -} - -Client.prototype.setTypeParser = function (oid, format, parseFn) { - return this._types.setTypeParser(oid, format, parseFn) -} - -Client.prototype.getTypeParser = function (oid, format) { - return this._types.getTypeParser(oid, format) -} - -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeIdentifier = function (str) { - return '"' + str.replace(/"/g, '""') + '"' -} - -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeLiteral = function (str) { - var hasBackslash = false - var escaped = "'" - - for (var i = 0; i < str.length; i++) { - var c = str[i] - if (c === "'") { - escaped += c + c - } else if (c === '\\') { - escaped += c + c - hasBackslash = true - } else { - escaped += c - } - } - - escaped += "'" - - if (hasBackslash === true) { - escaped = ' E' + escaped - } - - return escaped -} - -Client.prototype._pulseQueryQueue = function () { - if (this.readyForQuery === true) { - this.activeQuery = this.queryQueue.shift() - if (this.activeQuery) { - this.readyForQuery = false - this.hasExecuted = true - - const queryError = this.activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - this.activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) + if (self.ssl) { + con.requestSsl() + } else { + con.startup(self.getStartupConf()) } - } else if (this.hasExecuted) { - this.activeQuery = null - this.emit('drain') - } - } -} + }) -Client.prototype.query = function (config, values, callback) { - // can take in strings, config object or query object - var query - var result - var readTimeout - var readTimeoutTimer - var queryCallback + con.on('sslconnect', function () { + con.startup(self.getStartupConf()) + }) - if (config === null || config === undefined) { - throw new TypeError('Client was passed a null or undefined query') - } else if (typeof config.submit === 'function') { - readTimeout = config.query_timeout || this.connectionParameters.query_timeout - result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values + function checkPgPass(cb) { + return function (msg) { + if (typeof self.password === 'function') { + self._Promise + .resolve() + .then(() => self.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return + } + self.connectionParameters.password = self.password = pass + } else { + self.connectionParameters.password = self.password = null + } + cb(msg) + }) + .catch((err) => { + con.emit('error', err) + }) + } else if (self.password !== null) { + cb(msg) + } else { + pgPass(self.connectionParameters, function (pass) { + if (undefined !== pass) { + self.connectionParameters.password = self.password = pass + } + cb(msg) + }) + } + } } - } else { - readTimeout = this.connectionParameters.query_timeout - query = new Query(config, values, callback) - if (!query.callback) { - result = new this._Promise((resolve, reject) => { - query.callback = (err, res) => (err ? reject(err) : resolve(res)) + + // password request handling + con.on( + 'authenticationCleartextPassword', + checkPgPass(function () { + con.password(self.password) }) + ) + + // password request handling + con.on( + 'authenticationMD5Password', + checkPgPass(function (msg) { + con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) + }) + ) + + // password request handling (SASL) + var saslSession + con.on( + 'authenticationSASL', + checkPgPass(function (msg) { + saslSession = sasl.startSession(msg.mechanisms) + + con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) + }) + ) + + // password request handling (SASL) + con.on('authenticationSASLContinue', function (msg) { + sasl.continueSession(saslSession, self.password, msg.data) + + con.sendSCRAMClientFinalMessage(saslSession.response) + }) + + // password request handling (SASL) + con.on('authenticationSASLFinal', function (msg) { + sasl.finalizeSession(saslSession, msg.data) + + saslSession = null + }) + + con.once('backendKeyData', function (msg) { + self.processID = msg.processID + self.secretKey = msg.secretKey + }) + + const connectingErrorHandler = (err) => { + if (this._connectionError) { + return + } + this._connectionError = true + clearTimeout(connectionTimeoutHandle) + if (callback) { + return callback(err) + } + this.emit('error', err) } - } - if (readTimeout) { - queryCallback = query.callback + const connectedErrorHandler = (err) => { + this._queryable = false + this._errorAllQueries(err) + this.emit('error', err) + } - readTimeoutTimer = setTimeout(() => { - var error = new Error('Query read timeout') + const connectedErrorMessageHandler = (msg) => { + const activeQuery = this.activeQuery + + if (!activeQuery) { + connectedErrorHandler(msg) + return + } + + this.activeQuery = null + activeQuery.handleError(msg, con) + } + + con.on('error', connectingErrorHandler) + con.on('errorMessage', connectingErrorHandler) + + // hook up query handling events to connection + // after the connection initially becomes ready for queries + con.once('readyForQuery', function () { + self._connecting = false + self._connected = true + self._attachListeners(con) + con.removeListener('error', connectingErrorHandler) + con.removeListener('errorMessage', connectingErrorHandler) + con.on('error', connectedErrorHandler) + con.on('errorMessage', connectedErrorMessageHandler) + clearTimeout(connectionTimeoutHandle) + + // process possible callback argument to Client#connect + if (callback) { + callback(null, self) + // remove callback for proper error handling + // after the connect event + callback = null + } + self.emit('connect') + }) + + con.on('readyForQuery', function () { + var activeQuery = self.activeQuery + self.activeQuery = null + self.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(con) + } + self._pulseQueryQueue() + }) + + con.once('end', () => { + const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') + + clearTimeout(connectionTimeoutHandle) + this._errorAllQueries(error) + + if (!this._ending) { + // if the connection is ended without us calling .end() + // on this client then we have an unexpected disconnection + // treat this as an error unless we've already emitted an error + // during connection. + if (this._connecting && !this._connectionError) { + if (callback) { + callback(error) + } else { + connectedErrorHandler(error) + } + } else if (!this._connectionError) { + connectedErrorHandler(error) + } + } process.nextTick(() => { - query.handleError(error, this.connection) + this.emit('end') }) + }) - queryCallback(error) + con.on('notice', function (msg) { + self.emit('notice', msg) + }) + } - // we already returned an error, - // just do nothing if query completes - query.callback = () => {} + connect(callback) { + if (callback) { + this._connect(callback) + return + } - // Remove from queue - var index = this.queryQueue.indexOf(query) - if (index > -1) { - this.queryQueue.splice(index, 1) + return new this._Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) + }) + } + + _attachListeners(con) { + const self = this + // delegate rowDescription to active query + con.on('rowDescription', function (msg) { + self.activeQuery.handleRowDescription(msg) + }) + + // delegate dataRow to active query + con.on('dataRow', function (msg) { + self.activeQuery.handleDataRow(msg) + }) + + // delegate portalSuspended to active query + // eslint-disable-next-line no-unused-vars + con.on('portalSuspended', function (msg) { + self.activeQuery.handlePortalSuspended(con) + }) + + // delegate emptyQuery to active query + // eslint-disable-next-line no-unused-vars + con.on('emptyQuery', function (msg) { + self.activeQuery.handleEmptyQuery(con) + }) + + // delegate commandComplete to active query + con.on('commandComplete', function (msg) { + self.activeQuery.handleCommandComplete(msg, con) + }) + + // if a prepared statement has a name and properly parses + // we track that its already been executed so we don't parse + // it again on the same client + // eslint-disable-next-line no-unused-vars + con.on('parseComplete', function (msg) { + if (self.activeQuery.name) { + con.parsedStatements[self.activeQuery.name] = self.activeQuery.text + } + }) + + con.on('copyInResponse', this.handleCopyInResponse.bind(this)) + con.on('copyData', this.handleCopyData.bind(this)) + con.on('notification', this.handleNotification.bind(this)) + } + + handleCopyInResponse(msg) { + this.activeQuery.handleCopyInResponse(this.connection) + } + + handleCopyData(msg) { + this.activeQuery.handleCopyData(msg, this.connection) + } + + handleNotification(msg) { + this.emit('notification', msg) + } + + getStartupConf() { + var params = this.connectionParameters + + var data = { + user: params.user, + database: params.database, + } + + var appName = params.application_name || params.fallback_application_name + if (appName) { + data.application_name = appName + } + if (params.replication) { + data.replication = '' + params.replication + } + if (params.statement_timeout) { + data.statement_timeout = String(parseInt(params.statement_timeout, 10)) + } + if (params.idle_in_transaction_session_timeout) { + data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) + } + if (params.options) { + data.options = params.options + } + + return data + } + + cancel(client, query) { + if (client.activeQuery === query) { + var con = this.connection + + if (this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port) + } else { + con.connect(this.port, this.host) } - this._pulseQueryQueue() - }, readTimeout) - - query.callback = (err, res) => { - clearTimeout(readTimeoutTimer) - queryCallback(err, res) + // once connection is established send cancel message + con.on('connect', function () { + con.cancel(client.processID, client.secretKey) + }) + } else if (client.queryQueue.indexOf(query) !== -1) { + client.queryQueue.splice(client.queryQueue.indexOf(query), 1) } } - if (this.binary && !query.binary) { - query.binary = true + setTypeParser(oid, format, parseFn) { + return this._types.setTypeParser(oid, format, parseFn) } - if (query._result && !query._result._types) { - query._result._types = this._types + getTypeParser(oid, format) { + return this._types.getTypeParser(oid, format) } - if (!this._queryable) { - process.nextTick(() => { - query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) - }) - return result + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeIdentifier(str) { + return '"' + str.replace(/"/g, '""') + '"' } - if (this._ending) { - process.nextTick(() => { - query.handleError(new Error('Client was closed and is not queryable'), this.connection) - }) - return result + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeLiteral(str) { + var hasBackslash = false + var escaped = "'" + + for (var i = 0; i < str.length; i++) { + var c = str[i] + if (c === "'") { + escaped += c + c + } else if (c === '\\') { + escaped += c + c + hasBackslash = true + } else { + escaped += c + } + } + + escaped += "'" + + if (hasBackslash === true) { + escaped = ' E' + escaped + } + + return escaped } - this.queryQueue.push(query) - this._pulseQueryQueue() - return result -} + _pulseQueryQueue() { + if (this.readyForQuery === true) { + this.activeQuery = this.queryQueue.shift() + if (this.activeQuery) { + this.readyForQuery = false + this.hasExecuted = true -Client.prototype.end = function (cb) { - this._ending = true + const queryError = this.activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + } else if (this.hasExecuted) { + this.activeQuery = null + this.emit('drain') + } + } + } - // if we have never connected, then end is a noop, callback immediately - if (!this.connection._connecting) { - if (cb) { - cb() + query(config, values, callback) { + // can take in strings, config object or query object + var query + var result + var readTimeout + var readTimeoutTimer + var queryCallback + + if (config === null || config === undefined) { + throw new TypeError('Client was passed a null or undefined query') + } else if (typeof config.submit === 'function') { + readTimeout = config.query_timeout || this.connectionParameters.query_timeout + result = query = config + if (typeof values === 'function') { + query.callback = query.callback || values + } } else { - return this._Promise.resolve() + readTimeout = this.connectionParameters.query_timeout + query = new Query(config, values, callback) + if (!query.callback) { + result = new this._Promise((resolve, reject) => { + query.callback = (err, res) => (err ? reject(err) : resolve(res)) + }) + } } + + if (readTimeout) { + queryCallback = query.callback + + readTimeoutTimer = setTimeout(() => { + var error = new Error('Query read timeout') + + process.nextTick(() => { + query.handleError(error, this.connection) + }) + + queryCallback(error) + + // we already returned an error, + // just do nothing if query completes + query.callback = () => {} + + // Remove from queue + var index = this.queryQueue.indexOf(query) + if (index > -1) { + this.queryQueue.splice(index, 1) + } + + this._pulseQueryQueue() + }, readTimeout) + + query.callback = (err, res) => { + clearTimeout(readTimeoutTimer) + queryCallback(err, res) + } + } + + if (this.binary && !query.binary) { + query.binary = true + } + + if (query._result && !query._result._types) { + query._result._types = this._types + } + + if (!this._queryable) { + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + }) + return result + } + + if (this._ending) { + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable'), this.connection) + }) + return result + } + + this.queryQueue.push(query) + this._pulseQueryQueue() + return result } - if (this.activeQuery || !this._queryable) { - // if we have an active query we need to force a disconnect - // on the socket - otherwise a hung query could block end forever - this.connection.stream.destroy() - } else { - this.connection.end() - } + end(cb) { + this._ending = true - if (cb) { - this.connection.once('end', cb) - } else { - return new this._Promise((resolve) => { - this.connection.once('end', resolve) - }) + // if we have never connected, then end is a noop, callback immediately + if (!this.connection._connecting) { + if (cb) { + cb() + } else { + return this._Promise.resolve() + } + } + + if (this.activeQuery || !this._queryable) { + // if we have an active query we need to force a disconnect + // on the socket - otherwise a hung query could block end forever + this.connection.stream.destroy() + } else { + this.connection.end() + } + + if (cb) { + this.connection.once('end', cb) + } else { + return new this._Promise((resolve) => { + this.connection.once('end', resolve) + }) + } } } diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 54668252..eae798d5 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -40,73 +40,6 @@ var readSSLConfigFromEnvironment = function () { return defaults.ssl } -var ConnectionParameters = function (config) { - // if a string is passed, it is a raw connection string so we parse it into a config - config = typeof config === 'string' ? parse(config) : config || {} - - // if the config has a connectionString defined, parse IT into the config we use - // this will override other default values with what is stored in connectionString - if (config.connectionString) { - config = Object.assign({}, config, parse(config.connectionString)) - } - - this.user = val('user', config) - this.database = val('database', config) - - if (this.database === undefined) { - this.database = this.user - } - - this.port = parseInt(val('port', config), 10) - this.host = val('host', config) - - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: val('password', config), - }) - - this.binary = val('binary', config) - this.options = val('options', config) - - this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl - - // support passing in ssl=no-verify via connection string - if (this.ssl === 'no-verify') { - this.ssl = { rejectUnauthorized: false } - } - - this.client_encoding = val('client_encoding', config) - this.replication = val('replication', config) - // a domain socket begins with '/' - this.isDomainSocket = !(this.host || '').indexOf('/') - - this.application_name = val('application_name', config, 'PGAPPNAME') - this.fallback_application_name = val('fallback_application_name', config, false) - this.statement_timeout = val('statement_timeout', config, false) - this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) - this.query_timeout = val('query_timeout', config, false) - - if (config.connectionTimeoutMillis === undefined) { - this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 - } else { - this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) - } - - if (config.keepAlive === false) { - this.keepalives = 0 - } else if (config.keepAlive === true) { - this.keepalives = 1 - } - - if (typeof config.keepAliveInitialDelayMillis === 'number') { - this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) - } -} - // Convert arg to a string, surround in single quotes, and escape single quotes and backslashes var quoteParamValue = function (value) { return "'" + ('' + value).replace(/\\/g, '\\\\').replace(/'/g, "\\'") + "'" @@ -119,43 +52,112 @@ var add = function (params, config, paramName) { } } -ConnectionParameters.prototype.getLibpqConnectionString = function (cb) { - var params = [] - add(params, this, 'user') - add(params, this, 'password') - add(params, this, 'port') - add(params, this, 'application_name') - add(params, this, 'fallback_application_name') - add(params, this, 'connect_timeout') - add(params, this, 'options') +class ConnectionParameters { + constructor(config) { + // if a string is passed, it is a raw connection string so we parse it into a config + config = typeof config === 'string' ? parse(config) : config || {} - var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} - add(params, ssl, 'sslmode') - add(params, ssl, 'sslca') - add(params, ssl, 'sslkey') - add(params, ssl, 'sslcert') - add(params, ssl, 'sslrootcert') + // if the config has a connectionString defined, parse IT into the config we use + // this will override other default values with what is stored in connectionString + if (config.connectionString) { + config = Object.assign({}, config, parse(config.connectionString)) + } - if (this.database) { - params.push('dbname=' + quoteParamValue(this.database)) + this.user = val('user', config) + this.database = val('database', config) + + if (this.database === undefined) { + this.database = this.user + } + + this.port = parseInt(val('port', config), 10) + this.host = val('host', config) + + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: val('password', config), + }) + + this.binary = val('binary', config) + this.options = val('options', config) + + this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl + + // support passing in ssl=no-verify via connection string + if (this.ssl === 'no-verify') { + this.ssl = { rejectUnauthorized: false } + } + + this.client_encoding = val('client_encoding', config) + this.replication = val('replication', config) + // a domain socket begins with '/' + this.isDomainSocket = !(this.host || '').indexOf('/') + + this.application_name = val('application_name', config, 'PGAPPNAME') + this.fallback_application_name = val('fallback_application_name', config, false) + this.statement_timeout = val('statement_timeout', config, false) + this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) + this.query_timeout = val('query_timeout', config, false) + + if (config.connectionTimeoutMillis === undefined) { + this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 + } else { + this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) + } + + if (config.keepAlive === false) { + this.keepalives = 0 + } else if (config.keepAlive === true) { + this.keepalives = 1 + } + + if (typeof config.keepAliveInitialDelayMillis === 'number') { + this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) + } } - if (this.replication) { - params.push('replication=' + quoteParamValue(this.replication)) + + getLibpqConnectionString(cb) { + var params = [] + add(params, this, 'user') + add(params, this, 'password') + add(params, this, 'port') + add(params, this, 'application_name') + add(params, this, 'fallback_application_name') + add(params, this, 'connect_timeout') + add(params, this, 'options') + + var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} + add(params, ssl, 'sslmode') + add(params, ssl, 'sslca') + add(params, ssl, 'sslkey') + add(params, ssl, 'sslcert') + add(params, ssl, 'sslrootcert') + + if (this.database) { + params.push('dbname=' + quoteParamValue(this.database)) + } + if (this.replication) { + params.push('replication=' + quoteParamValue(this.replication)) + } + if (this.host) { + params.push('host=' + quoteParamValue(this.host)) + } + if (this.isDomainSocket) { + return cb(null, params.join(' ')) + } + if (this.client_encoding) { + params.push('client_encoding=' + quoteParamValue(this.client_encoding)) + } + dns.lookup(this.host, function (err, address) { + if (err) return cb(err, null) + params.push('hostaddr=' + quoteParamValue(address)) + return cb(null, params.join(' ')) + }) } - if (this.host) { - params.push('host=' + quoteParamValue(this.host)) - } - if (this.isDomainSocket) { - return cb(null, params.join(' ')) - } - if (this.client_encoding) { - params.push('client_encoding=' + quoteParamValue(this.client_encoding)) - } - dns.lookup(this.host, function (err, address) { - if (err) return cb(err, null) - params.push('hostaddr=' + quoteParamValue(address)) - return cb(null, params.join(' ')) - }) } module.exports = ConnectionParameters diff --git a/packages/pg/lib/result.js b/packages/pg/lib/result.js index 233455b0..5e895736 100644 --- a/packages/pg/lib/result.js +++ b/packages/pg/lib/result.js @@ -9,95 +9,170 @@ var types = require('pg-types') +var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ + // result object returned from query // in the 'end' event and also // passed as second argument to provided callback -var Result = function (rowMode, types) { - this.command = null - this.rowCount = null - this.oid = null - this.rows = [] - this.fields = [] - this._parsers = undefined - this._types = types - this.RowCtor = null - this.rowAsArray = rowMode === 'array' - if (this.rowAsArray) { - this.parseRow = this._parseRowAsArray - } -} - -var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ - -// adds a command complete message -Result.prototype.addCommandComplete = function (msg) { - var match - if (msg.text) { - // pure javascript - match = matchRegexp.exec(msg.text) - } else { - // native bindings - match = matchRegexp.exec(msg.command) - } - if (match) { - this.command = match[1] - if (match[3]) { - // COMMMAND OID ROWS - this.oid = parseInt(match[2], 10) - this.rowCount = parseInt(match[3], 10) - } else if (match[2]) { - // COMMAND ROWS - this.rowCount = parseInt(match[2], 10) +class Result { + constructor(rowMode, types) { + this.command = null + this.rowCount = null + this.oid = null + this.rows = [] + this.fields = [] + this._parsers = undefined + this._types = types + this.RowCtor = null + this.rowAsArray = rowMode === 'array' + if (this.rowAsArray) { + this.parseRow = this._parseRowAsArray } } -} -Result.prototype._parseRowAsArray = function (rowData) { - var row = new Array(rowData.length) - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - if (rawValue !== null) { - row[i] = this._parsers[i](rawValue) + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) } else { - row[i] = null + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } } } - return row -} -Result.prototype.parseRow = function (rowData) { - var row = {} - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - var field = this.fields[i].name - if (rawValue !== null) { - row[field] = this._parsers[i](rawValue) - } else { - row[field] = null + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row + } + + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } + } + return row + } + + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } - return row -} -Result.prototype.addRow = function (row) { - this.rows.push(row) -} - -Result.prototype.addFields = function (fieldDescriptions) { - // clears field definitions - // multiple query statements in 1 action can result in multiple sets - // of rowDescriptions...eg: 'select NOW(); select 1::int;' - // you need to reset the fields - this.fields = fieldDescriptions - if (this.fields.length) { - this._parsers = new Array(fieldDescriptions.length) - } - for (var i = 0; i < fieldDescriptions.length; i++) { - var desc = fieldDescriptions[i] - if (this._types) { - this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) } else { - this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } + } + } + + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row + } + + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } + } + return row + } + + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } }