diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 1c1aa641..a668aa85 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -61,7 +61,7 @@ const run = async () => { queries = await bench(client, insert, seconds * 1000) console.log('insert queries:', queries) console.log('qps', queries / seconds) - console.log('on my laptop best so far seen 5799 qps') + console.log('on my laptop best so far seen 6303 qps') console.log('') console.log('Warming up bytea test') diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 93dfc6c9..bc91924e 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -19,388 +19,91 @@ var Query = require('./query') var defaults = require('./defaults') var Connection = require('./connection') -var Client = function (config) { - EventEmitter.call(this) +class Client extends EventEmitter { + constructor(config) { + super() - this.connectionParameters = new ConnectionParameters(config) - this.user = this.connectionParameters.user - this.database = this.connectionParameters.database - this.port = this.connectionParameters.port - this.host = this.connectionParameters.host + this.connectionParameters = new ConnectionParameters(config) + this.user = this.connectionParameters.user + this.database = this.connectionParameters.database + this.port = this.connectionParameters.port + this.host = this.connectionParameters.host - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: this.connectionParameters.password, - }) - - this.replication = this.connectionParameters.replication - - var c = config || {} - - this._Promise = c.Promise || global.Promise - this._types = new TypeOverrides(c.types) - this._ending = false - this._connecting = false - this._connected = false - this._connectionError = false - this._queryable = true - - this.connection = - c.connection || - new Connection({ - stream: c.stream, - ssl: this.connectionParameters.ssl, - keepAlive: c.keepAlive || false, - keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, - encoding: this.connectionParameters.client_encoding || 'utf8', + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: this.connectionParameters.password, }) - this.queryQueue = [] - this.binary = c.binary || defaults.binary - this.processID = null - this.secretKey = null - this.ssl = this.connectionParameters.ssl || false - this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 -} -util.inherits(Client, EventEmitter) + this.replication = this.connectionParameters.replication -Client.prototype._errorAllQueries = function (err) { - const enqueueError = (query) => { - process.nextTick(() => { - query.handleError(err, this.connection) - }) + var c = config || {} + + this._Promise = c.Promise || global.Promise + this._types = new TypeOverrides(c.types) + this._ending = false + this._connecting = false + this._connected = false + this._connectionError = false + this._queryable = true + + this.connection = + c.connection || + new Connection({ + stream: c.stream, + ssl: this.connectionParameters.ssl, + keepAlive: c.keepAlive || false, + keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, + encoding: this.connectionParameters.client_encoding || 'utf8', + }) + this.queryQueue = [] + this.binary = c.binary || defaults.binary + this.processID = null + this.secretKey = null + this.ssl = this.connectionParameters.ssl || false + this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 } - if (this.activeQuery) { - enqueueError(this.activeQuery) - this.activeQuery = null - } - - this.queryQueue.forEach(enqueueError) - this.queryQueue.length = 0 -} - -Client.prototype._connect = function (callback) { - var self = this - var con = this.connection - if (this._connecting || this._connected) { - const err = new Error('Client has already been connected. You cannot reuse a client.') - process.nextTick(() => { - callback(err) - }) - return - } - this._connecting = true - - var connectionTimeoutHandle - if (this._connectionTimeoutMillis > 0) { - connectionTimeoutHandle = setTimeout(() => { - con._ending = true - con.stream.destroy(new Error('timeout expired')) - }, this._connectionTimeoutMillis) - } - - if (this.host && this.host.indexOf('/') === 0) { - con.connect(this.host + '/.s.PGSQL.' + this.port) - } else { - con.connect(this.port, this.host) - } - - // once connection is established send startup message - con.on('connect', function () { - if (self.ssl) { - con.requestSsl() - } else { - con.startup(self.getStartupConf()) - } - }) - - con.on('sslconnect', function () { - con.startup(self.getStartupConf()) - }) - - function checkPgPass(cb) { - return function (msg) { - if (typeof self.password === 'function') { - self._Promise - .resolve() - .then(() => self.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return - } - self.connectionParameters.password = self.password = pass - } else { - self.connectionParameters.password = self.password = null - } - cb(msg) - }) - .catch((err) => { - con.emit('error', err) - }) - } else if (self.password !== null) { - cb(msg) - } else { - pgPass(self.connectionParameters, function (pass) { - if (undefined !== pass) { - self.connectionParameters.password = self.password = pass - } - cb(msg) - }) - } - } - } - - // password request handling - con.on( - 'authenticationCleartextPassword', - checkPgPass(function () { - con.password(self.password) - }) - ) - - // password request handling - con.on( - 'authenticationMD5Password', - checkPgPass(function (msg) { - con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) - }) - ) - - // password request handling (SASL) - var saslSession - con.on( - 'authenticationSASL', - checkPgPass(function (msg) { - saslSession = sasl.startSession(msg.mechanisms) - - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) - }) - ) - - // password request handling (SASL) - con.on('authenticationSASLContinue', function (msg) { - sasl.continueSession(saslSession, self.password, msg.data) - - con.sendSCRAMClientFinalMessage(saslSession.response) - }) - - // password request handling (SASL) - con.on('authenticationSASLFinal', function (msg) { - sasl.finalizeSession(saslSession, msg.data) - - saslSession = null - }) - - con.once('backendKeyData', function (msg) { - self.processID = msg.processID - self.secretKey = msg.secretKey - }) - - const connectingErrorHandler = (err) => { - if (this._connectionError) { - return - } - this._connectionError = true - clearTimeout(connectionTimeoutHandle) - if (callback) { - return callback(err) - } - this.emit('error', err) - } - - const connectedErrorHandler = (err) => { - this._queryable = false - this._errorAllQueries(err) - this.emit('error', err) - } - - const connectedErrorMessageHandler = (msg) => { - const activeQuery = this.activeQuery - - if (!activeQuery) { - connectedErrorHandler(msg) - return + _errorAllQueries(err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) } - this.activeQuery = null - activeQuery.handleError(msg, con) - } - - con.on('error', connectingErrorHandler) - con.on('errorMessage', connectingErrorHandler) - - // hook up query handling events to connection - // after the connection initially becomes ready for queries - con.once('readyForQuery', function () { - self._connecting = false - self._connected = true - self._attachListeners(con) - con.removeListener('error', connectingErrorHandler) - con.removeListener('errorMessage', connectingErrorHandler) - con.on('error', connectedErrorHandler) - con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(connectionTimeoutHandle) - - // process possible callback argument to Client#connect - if (callback) { - callback(null, self) - // remove callback for proper error handling - // after the connect event - callback = null - } - self.emit('connect') - }) - - con.on('readyForQuery', function () { - var activeQuery = self.activeQuery - self.activeQuery = null - self.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(con) - } - self._pulseQueryQueue() - }) - - con.once('end', () => { - const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') - - clearTimeout(connectionTimeoutHandle) - this._errorAllQueries(error) - - if (!this._ending) { - // if the connection is ended without us calling .end() - // on this client then we have an unexpected disconnection - // treat this as an error unless we've already emitted an error - // during connection. - if (this._connecting && !this._connectionError) { - if (callback) { - callback(error) - } else { - connectedErrorHandler(error) - } - } else if (!this._connectionError) { - connectedErrorHandler(error) - } + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null } - process.nextTick(() => { - this.emit('end') - }) - }) - - con.on('notice', function (msg) { - self.emit('notice', msg) - }) -} - -Client.prototype.connect = function (callback) { - if (callback) { - this._connect(callback) - return + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 } - return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve() - } - }) - }) -} - -Client.prototype._attachListeners = function (con) { - const self = this - // delegate rowDescription to active query - con.on('rowDescription', function (msg) { - self.activeQuery.handleRowDescription(msg) - }) - - // delegate dataRow to active query - con.on('dataRow', function (msg) { - self.activeQuery.handleDataRow(msg) - }) - - // delegate portalSuspended to active query - // eslint-disable-next-line no-unused-vars - con.on('portalSuspended', function (msg) { - self.activeQuery.handlePortalSuspended(con) - }) - - // delegate emptyQuery to active query - // eslint-disable-next-line no-unused-vars - con.on('emptyQuery', function (msg) { - self.activeQuery.handleEmptyQuery(con) - }) - - // delegate commandComplete to active query - con.on('commandComplete', function (msg) { - self.activeQuery.handleCommandComplete(msg, con) - }) - - // if a prepared statement has a name and properly parses - // we track that its already been executed so we don't parse - // it again on the same client - // eslint-disable-next-line no-unused-vars - con.on('parseComplete', function (msg) { - if (self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = self.activeQuery.text - } - }) - - // eslint-disable-next-line no-unused-vars - con.on('copyInResponse', function (msg) { - self.activeQuery.handleCopyInResponse(self.connection) - }) - - con.on('copyData', function (msg) { - self.activeQuery.handleCopyData(msg, self.connection) - }) - - con.on('notification', function (msg) { - self.emit('notification', msg) - }) -} - -Client.prototype.getStartupConf = function () { - var params = this.connectionParameters - - var data = { - user: params.user, - database: params.database, - } - - var appName = params.application_name || params.fallback_application_name - if (appName) { - data.application_name = appName - } - if (params.replication) { - data.replication = '' + params.replication - } - if (params.statement_timeout) { - data.statement_timeout = String(parseInt(params.statement_timeout, 10)) - } - if (params.idle_in_transaction_session_timeout) { - data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) - } - if (params.options) { - data.options = params.options - } - - return data -} - -Client.prototype.cancel = function (client, query) { - if (client.activeQuery === query) { + _connect(callback) { + var self = this var con = this.connection + this._connectionCallback = callback + + if (this._connecting || this._connected) { + const err = new Error('Client has already been connected. You cannot reuse a client.') + process.nextTick(() => { + callback(err) + }) + return + } + this._connecting = true + + this.connectionTimeoutHandle + if (this._connectionTimeoutMillis > 0) { + this.connectionTimeoutHandle = setTimeout(() => { + con._ending = true + con.stream.destroy(new Error('timeout expired')) + }, this._connectionTimeoutMillis) + } if (this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) @@ -408,187 +111,498 @@ Client.prototype.cancel = function (client, query) { con.connect(this.port, this.host) } - // once connection is established send cancel message + // once connection is established send startup message con.on('connect', function () { - con.cancel(client.processID, client.secretKey) - }) - } else if (client.queryQueue.indexOf(query) !== -1) { - client.queryQueue.splice(client.queryQueue.indexOf(query), 1) - } -} - -Client.prototype.setTypeParser = function (oid, format, parseFn) { - return this._types.setTypeParser(oid, format, parseFn) -} - -Client.prototype.getTypeParser = function (oid, format) { - return this._types.getTypeParser(oid, format) -} - -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeIdentifier = function (str) { - return '"' + str.replace(/"/g, '""') + '"' -} - -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeLiteral = function (str) { - var hasBackslash = false - var escaped = "'" - - for (var i = 0; i < str.length; i++) { - var c = str[i] - if (c === "'") { - escaped += c + c - } else if (c === '\\') { - escaped += c + c - hasBackslash = true - } else { - escaped += c - } - } - - escaped += "'" - - if (hasBackslash === true) { - escaped = ' E' + escaped - } - - return escaped -} - -Client.prototype._pulseQueryQueue = function () { - if (this.readyForQuery === true) { - this.activeQuery = this.queryQueue.shift() - if (this.activeQuery) { - this.readyForQuery = false - this.hasExecuted = true - - const queryError = this.activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - this.activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) + if (self.ssl) { + con.requestSsl() + } else { + con.startup(self.getStartupConf()) } - } else if (this.hasExecuted) { - this.activeQuery = null - this.emit('drain') - } - } -} + }) -Client.prototype.query = function (config, values, callback) { - // can take in strings, config object or query object - var query - var result - var readTimeout - var readTimeoutTimer - var queryCallback + con.on('sslconnect', function () { + con.startup(self.getStartupConf()) + }) - if (config === null || config === undefined) { - throw new TypeError('Client was passed a null or undefined query') - } else if (typeof config.submit === 'function') { - readTimeout = config.query_timeout || this.connectionParameters.query_timeout - result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values - } - } else { - readTimeout = this.connectionParameters.query_timeout - query = new Query(config, values, callback) - if (!query.callback) { - result = new this._Promise((resolve, reject) => { - query.callback = (err, res) => (err ? reject(err) : resolve(res)) - }) - } - } + this._attachListeners(con) - if (readTimeout) { - queryCallback = query.callback + con.once('end', () => { + const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') - readTimeoutTimer = setTimeout(() => { - var error = new Error('Query read timeout') + clearTimeout(this.connectionTimeoutHandle) + this._errorAllQueries(error) + + if (!this._ending) { + // if the connection is ended without us calling .end() + // on this client then we have an unexpected disconnection + // treat this as an error unless we've already emitted an error + // during connection. + if (this._connecting && !this._connectionError) { + if (this._connectionCallback) { + this._connectionCallback(error) + } else { + this._handleErrorEvent(error) + } + } else if (!this._connectionError) { + this._handleErrorEvent(error) + } + } process.nextTick(() => { - query.handleError(error, this.connection) + this.emit('end') }) + }) + } - queryCallback(error) - - // we already returned an error, - // just do nothing if query completes - query.callback = () => {} - - // Remove from queue - var index = this.queryQueue.indexOf(query) - if (index > -1) { - this.queryQueue.splice(index, 1) - } - - this._pulseQueryQueue() - }, readTimeout) - - query.callback = (err, res) => { - clearTimeout(readTimeoutTimer) - queryCallback(err, res) + connect(callback) { + if (callback) { + this._connect(callback) + return } - } - if (this.binary && !query.binary) { - query.binary = true - } - - if (query._result && !query._result._types) { - query._result._types = this._types - } - - if (!this._queryable) { - process.nextTick(() => { - query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + return new this._Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) }) - return result } - if (this._ending) { - process.nextTick(() => { - query.handleError(new Error('Client was closed and is not queryable'), this.connection) - }) - return result + _attachListeners(con) { + // password request handling + con.on('authenticationCleartextPassword', this._handleAuthCleartextPassword.bind(this)) + // password request handling + con.on('authenticationMD5Password', this._handleAuthMD5Password.bind(this)) + // password request handling (SASL) + con.on('authenticationSASL', this._handleAuthSASL.bind(this)) + con.on('authenticationSASLContinue', this._handleAuthSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this._handleAuthSASLFinal.bind(this)) + con.on('backendKeyData', this._handleBackendKeyData.bind(this)) + con.on('error', this._handleErrorEvent.bind(this)) + con.on('errorMessage', this._handleErrorMessage.bind(this)) + con.on('readyForQuery', this._handleReadyForQuery.bind(this)) + con.on('notice', this._handleNotice.bind(this)) + con.on('rowDescription', this._handleRowDescription.bind(this)) + con.on('dataRow', this._handleDataRow.bind(this)) + con.on('portalSuspended', this._handlePortalSuspended.bind(this)) + con.on('emptyQuery', this._handleEmptyQuery.bind(this)) + con.on('commandComplete', this._handleCommandComplete.bind(this)) + con.on('parseComplete', this._handleParseComplete.bind(this)) + con.on('copyInResponse', this._handleCopyInResponse.bind(this)) + con.on('copyData', this._handleCopyData.bind(this)) + con.on('notification', this._handleNotification.bind(this)) } - this.queryQueue.push(query) - this._pulseQueryQueue() - return result -} - -Client.prototype.end = function (cb) { - this._ending = true - - // if we have never connected, then end is a noop, callback immediately - if (!this.connection._connecting) { - if (cb) { + // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function + // it can be supplied by the user if required - this is a breaking change! + _checkPgPass(cb) { + const con = this.connection + if (typeof this.password === 'function') { + this._Promise + .resolve() + .then(() => this.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return + } + this.connectionParameters.password = this.password = pass + } else { + this.connectionParameters.password = this.password = null + } + cb() + }) + .catch((err) => { + con.emit('error', err) + }) + } else if (this.password !== null) { cb() } else { - return this._Promise.resolve() + pgPass(this.connectionParameters, function (pass) { + if (undefined !== pass) { + this.connectionParameters.password = this.password = pass + } + cb() + }) } } - if (this.activeQuery || !this._queryable) { - // if we have an active query we need to force a disconnect - // on the socket - otherwise a hung query could block end forever - this.connection.stream.destroy() - } else { - this.connection.end() + _handleAuthCleartextPassword(msg) { + this._checkPgPass(() => { + this.connection.password(this.password) + }) } - if (cb) { - this.connection.once('end', cb) - } else { - return new this._Promise((resolve) => { - this.connection.once('end', resolve) + _handleAuthMD5Password(msg) { + this._checkPgPass(() => { + const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt) + this.connection.password(hashedPassword) }) } + + _handleAuthSASL(msg) { + this._checkPgPass(() => { + this.saslSession = sasl.startSession(msg.mechanisms) + this.connection.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response) + }) + } + + _handleAuthSASLContinue(msg) { + sasl.continueSession(this.saslSession, this.password, msg.data) + this.connection.sendSCRAMClientFinalMessage(this.saslSession.response) + } + + _handleAuthSASLFinal(msg) { + sasl.finalizeSession(this.saslSession, msg.data) + this.saslSession = null + } + + _handleBackendKeyData(msg) { + this.processID = msg.processID + this.secretKey = msg.secretKey + } + + _handleReadyForQuery(msg) { + if (this._connecting) { + this._connecting = false + this._connected = true + clearTimeout(this.connectionTimeoutHandle) + + // process possible callback argument to Client#connect + if (this._connectionCallback) { + this._connectionCallback(null, this) + // remove callback for proper error handling + // after the connect event + this._connectionCallback = null + } + this.emit('connect') + } + const { activeQuery } = this + this.activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } + this._pulseQueryQueue() + } + + // if we receieve an error event or error message + // during the connection process we handle it here + _handleErrorWhileConnecting(err) { + if (this._connectionError) { + // TODO(bmc): this is swallowing errors - we shouldn't do this + return + } + this._connectionError = true + clearTimeout(this.connectionTimeoutHandle) + if (this._connectionCallback) { + return this._connectionCallback(err) + } + this.emit('error', err) + } + + // if we're connected and we receive an error event from the connection + // this means the socket is dead - do a hard abort of all queries and emit + // the socket error on the client as well + _handleErrorEvent(err) { + if (this._connecting) { + return this._handleErrorWhileConnecting(err) + } + this._queryable = false + this._errorAllQueries(err) + this.emit('error', err) + } + + // handle error messages from the postgres backend + _handleErrorMessage(msg) { + if (this._connecting) { + return this._handleErrorWhileConnecting(msg) + } + const activeQuery = this.activeQuery + + if (!activeQuery) { + this._handleErrorEvent(msg) + return + } + + this.activeQuery = null + activeQuery.handleError(msg, this.connection) + } + + _handleRowDescription(msg) { + // delegate rowDescription to active query + this.activeQuery.handleRowDescription(msg) + } + + _handleDataRow(msg) { + // delegate dataRow to active query + this.activeQuery.handleDataRow(msg) + } + + _handlePortalSuspended(msg) { + // delegate portalSuspended to active query + this.activeQuery.handlePortalSuspended(this.connection) + } + + _handleEmptyQuery(msg) { + // delegate emptyQuery to active query + this.activeQuery.handleEmptyQuery(this.connection) + } + + _handleCommandComplete(msg) { + // delegate commandComplete to active query + this.activeQuery.handleCommandComplete(msg, this.connection) + } + + _handleParseComplete(msg) { + // if a prepared statement has a name and properly parses + // we track that its already been executed so we don't parse + // it again on the same client + if (this.activeQuery.name) { + this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text + } + } + + _handleCopyInResponse(msg) { + this.activeQuery.handleCopyInResponse(this.connection) + } + + _handleCopyData(msg) { + this.activeQuery.handleCopyData(msg, this.connection) + } + + _handleNotification(msg) { + this.emit('notification', msg) + } + + _handleNotice(msg) { + this.emit('notice', msg) + } + + getStartupConf() { + var params = this.connectionParameters + + var data = { + user: params.user, + database: params.database, + } + + var appName = params.application_name || params.fallback_application_name + if (appName) { + data.application_name = appName + } + if (params.replication) { + data.replication = '' + params.replication + } + if (params.statement_timeout) { + data.statement_timeout = String(parseInt(params.statement_timeout, 10)) + } + if (params.idle_in_transaction_session_timeout) { + data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) + } + if (params.options) { + data.options = params.options + } + + return data + } + + cancel(client, query) { + if (client.activeQuery === query) { + var con = this.connection + + if (this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port) + } else { + con.connect(this.port, this.host) + } + + // once connection is established send cancel message + con.on('connect', function () { + con.cancel(client.processID, client.secretKey) + }) + } else if (client.queryQueue.indexOf(query) !== -1) { + client.queryQueue.splice(client.queryQueue.indexOf(query), 1) + } + } + + setTypeParser(oid, format, parseFn) { + return this._types.setTypeParser(oid, format, parseFn) + } + + getTypeParser(oid, format) { + return this._types.getTypeParser(oid, format) + } + + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeIdentifier(str) { + return '"' + str.replace(/"/g, '""') + '"' + } + + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeLiteral(str) { + var hasBackslash = false + var escaped = "'" + + for (var i = 0; i < str.length; i++) { + var c = str[i] + if (c === "'") { + escaped += c + c + } else if (c === '\\') { + escaped += c + c + hasBackslash = true + } else { + escaped += c + } + } + + escaped += "'" + + if (hasBackslash === true) { + escaped = ' E' + escaped + } + + return escaped + } + + _pulseQueryQueue() { + if (this.readyForQuery === true) { + this.activeQuery = this.queryQueue.shift() + if (this.activeQuery) { + this.readyForQuery = false + this.hasExecuted = true + + const queryError = this.activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + } else if (this.hasExecuted) { + this.activeQuery = null + this.emit('drain') + } + } + } + + query(config, values, callback) { + // can take in strings, config object or query object + var query + var result + var readTimeout + var readTimeoutTimer + var queryCallback + + if (config === null || config === undefined) { + throw new TypeError('Client was passed a null or undefined query') + } else if (typeof config.submit === 'function') { + readTimeout = config.query_timeout || this.connectionParameters.query_timeout + result = query = config + if (typeof values === 'function') { + query.callback = query.callback || values + } + } else { + readTimeout = this.connectionParameters.query_timeout + query = new Query(config, values, callback) + if (!query.callback) { + result = new this._Promise((resolve, reject) => { + query.callback = (err, res) => (err ? reject(err) : resolve(res)) + }) + } + } + + if (readTimeout) { + queryCallback = query.callback + + readTimeoutTimer = setTimeout(() => { + var error = new Error('Query read timeout') + + process.nextTick(() => { + query.handleError(error, this.connection) + }) + + queryCallback(error) + + // we already returned an error, + // just do nothing if query completes + query.callback = () => {} + + // Remove from queue + var index = this.queryQueue.indexOf(query) + if (index > -1) { + this.queryQueue.splice(index, 1) + } + + this._pulseQueryQueue() + }, readTimeout) + + query.callback = (err, res) => { + clearTimeout(readTimeoutTimer) + queryCallback(err, res) + } + } + + if (this.binary && !query.binary) { + query.binary = true + } + + if (query._result && !query._result._types) { + query._result._types = this._types + } + + if (!this._queryable) { + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + }) + return result + } + + if (this._ending) { + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable'), this.connection) + }) + return result + } + + this.queryQueue.push(query) + this._pulseQueryQueue() + return result + } + + end(cb) { + this._ending = true + + // if we have never connected, then end is a noop, callback immediately + if (!this.connection._connecting) { + if (cb) { + cb() + } else { + return this._Promise.resolve() + } + } + + if (this.activeQuery || !this._queryable) { + // if we have an active query we need to force a disconnect + // on the socket - otherwise a hung query could block end forever + this.connection.stream.destroy() + } else { + this.connection.end() + } + + if (cb) { + this.connection.once('end', cb) + } else { + return new this._Promise((resolve) => { + this.connection.once('end', resolve) + }) + } + } } // expose a Query constructor diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 54668252..eae798d5 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -40,73 +40,6 @@ var readSSLConfigFromEnvironment = function () { return defaults.ssl } -var ConnectionParameters = function (config) { - // if a string is passed, it is a raw connection string so we parse it into a config - config = typeof config === 'string' ? parse(config) : config || {} - - // if the config has a connectionString defined, parse IT into the config we use - // this will override other default values with what is stored in connectionString - if (config.connectionString) { - config = Object.assign({}, config, parse(config.connectionString)) - } - - this.user = val('user', config) - this.database = val('database', config) - - if (this.database === undefined) { - this.database = this.user - } - - this.port = parseInt(val('port', config), 10) - this.host = val('host', config) - - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: val('password', config), - }) - - this.binary = val('binary', config) - this.options = val('options', config) - - this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl - - // support passing in ssl=no-verify via connection string - if (this.ssl === 'no-verify') { - this.ssl = { rejectUnauthorized: false } - } - - this.client_encoding = val('client_encoding', config) - this.replication = val('replication', config) - // a domain socket begins with '/' - this.isDomainSocket = !(this.host || '').indexOf('/') - - this.application_name = val('application_name', config, 'PGAPPNAME') - this.fallback_application_name = val('fallback_application_name', config, false) - this.statement_timeout = val('statement_timeout', config, false) - this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) - this.query_timeout = val('query_timeout', config, false) - - if (config.connectionTimeoutMillis === undefined) { - this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 - } else { - this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) - } - - if (config.keepAlive === false) { - this.keepalives = 0 - } else if (config.keepAlive === true) { - this.keepalives = 1 - } - - if (typeof config.keepAliveInitialDelayMillis === 'number') { - this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) - } -} - // Convert arg to a string, surround in single quotes, and escape single quotes and backslashes var quoteParamValue = function (value) { return "'" + ('' + value).replace(/\\/g, '\\\\').replace(/'/g, "\\'") + "'" @@ -119,43 +52,112 @@ var add = function (params, config, paramName) { } } -ConnectionParameters.prototype.getLibpqConnectionString = function (cb) { - var params = [] - add(params, this, 'user') - add(params, this, 'password') - add(params, this, 'port') - add(params, this, 'application_name') - add(params, this, 'fallback_application_name') - add(params, this, 'connect_timeout') - add(params, this, 'options') +class ConnectionParameters { + constructor(config) { + // if a string is passed, it is a raw connection string so we parse it into a config + config = typeof config === 'string' ? parse(config) : config || {} - var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} - add(params, ssl, 'sslmode') - add(params, ssl, 'sslca') - add(params, ssl, 'sslkey') - add(params, ssl, 'sslcert') - add(params, ssl, 'sslrootcert') + // if the config has a connectionString defined, parse IT into the config we use + // this will override other default values with what is stored in connectionString + if (config.connectionString) { + config = Object.assign({}, config, parse(config.connectionString)) + } - if (this.database) { - params.push('dbname=' + quoteParamValue(this.database)) + this.user = val('user', config) + this.database = val('database', config) + + if (this.database === undefined) { + this.database = this.user + } + + this.port = parseInt(val('port', config), 10) + this.host = val('host', config) + + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: val('password', config), + }) + + this.binary = val('binary', config) + this.options = val('options', config) + + this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl + + // support passing in ssl=no-verify via connection string + if (this.ssl === 'no-verify') { + this.ssl = { rejectUnauthorized: false } + } + + this.client_encoding = val('client_encoding', config) + this.replication = val('replication', config) + // a domain socket begins with '/' + this.isDomainSocket = !(this.host || '').indexOf('/') + + this.application_name = val('application_name', config, 'PGAPPNAME') + this.fallback_application_name = val('fallback_application_name', config, false) + this.statement_timeout = val('statement_timeout', config, false) + this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) + this.query_timeout = val('query_timeout', config, false) + + if (config.connectionTimeoutMillis === undefined) { + this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 + } else { + this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) + } + + if (config.keepAlive === false) { + this.keepalives = 0 + } else if (config.keepAlive === true) { + this.keepalives = 1 + } + + if (typeof config.keepAliveInitialDelayMillis === 'number') { + this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) + } } - if (this.replication) { - params.push('replication=' + quoteParamValue(this.replication)) + + getLibpqConnectionString(cb) { + var params = [] + add(params, this, 'user') + add(params, this, 'password') + add(params, this, 'port') + add(params, this, 'application_name') + add(params, this, 'fallback_application_name') + add(params, this, 'connect_timeout') + add(params, this, 'options') + + var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} + add(params, ssl, 'sslmode') + add(params, ssl, 'sslca') + add(params, ssl, 'sslkey') + add(params, ssl, 'sslcert') + add(params, ssl, 'sslrootcert') + + if (this.database) { + params.push('dbname=' + quoteParamValue(this.database)) + } + if (this.replication) { + params.push('replication=' + quoteParamValue(this.replication)) + } + if (this.host) { + params.push('host=' + quoteParamValue(this.host)) + } + if (this.isDomainSocket) { + return cb(null, params.join(' ')) + } + if (this.client_encoding) { + params.push('client_encoding=' + quoteParamValue(this.client_encoding)) + } + dns.lookup(this.host, function (err, address) { + if (err) return cb(err, null) + params.push('hostaddr=' + quoteParamValue(address)) + return cb(null, params.join(' ')) + }) } - if (this.host) { - params.push('host=' + quoteParamValue(this.host)) - } - if (this.isDomainSocket) { - return cb(null, params.join(' ')) - } - if (this.client_encoding) { - params.push('client_encoding=' + quoteParamValue(this.client_encoding)) - } - dns.lookup(this.host, function (err, address) { - if (err) return cb(err, null) - params.push('hostaddr=' + quoteParamValue(address)) - return cb(null, params.join(' ')) - }) } module.exports = ConnectionParameters diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 65867026..2142a401 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -13,201 +13,199 @@ var util = require('util') const { parse, serialize } = require('pg-protocol') -// TODO(bmc) support binary mode at some point -var Connection = function (config) { - EventEmitter.call(this) - config = config || {} - this.stream = config.stream || new net.Socket() - this._keepAlive = config.keepAlive - this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis - this.lastBuffer = false - this.parsedStatements = {} - this.ssl = config.ssl || false - this._ending = false - this._emitMessage = false - var self = this - this.on('newListener', function (eventName) { - if (eventName === 'message') { - self._emitMessage = true - } - }) -} - -util.inherits(Connection, EventEmitter) - -Connection.prototype.connect = function (port, host) { - var self = this - - this._connecting = true - this.stream.setNoDelay(true) - this.stream.connect(port, host) - - this.stream.once('connect', function () { - if (self._keepAlive) { - self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) - } - self.emit('connect') - }) - - const reportStreamError = function (error) { - // errors about disconnections should be ignored during disconnect - if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { - return - } - self.emit('error', error) - } - this.stream.on('error', reportStreamError) - - this.stream.on('close', function () { - self.emit('end') - }) - - if (!this.ssl) { - return this.attachListeners(this.stream) - } - - this.stream.once('data', function (buffer) { - var responseCode = buffer.toString('utf8') - switch (responseCode) { - case 'S': // Server supports SSL connections, continue with a secure connection - break - case 'N': // Server does not support SSL connections - self.stream.end() - return self.emit('error', new Error('The server does not support SSL connections')) - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error - self.stream.end() - return self.emit('error', new Error('There was an error establishing an SSL connection')) - } - var tls = require('tls') - const options = Object.assign( - { - socket: self.stream, - }, - self.ssl - ) - if (net.isIP(host) === 0) { - options.servername = host - } - self.stream = tls.connect(options) - self.attachListeners(self.stream) - self.stream.on('error', reportStreamError) - - self.emit('sslconnect') - }) -} - -Connection.prototype.attachListeners = function (stream) { - stream.on('end', () => { - this.emit('end') - }) - parse(stream, (msg) => { - var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (this._emitMessage) { - this.emit('message', msg) - } - this.emit(eventName, msg) - }) -} - -Connection.prototype.requestSsl = function () { - this.stream.write(serialize.requestSsl()) -} - -Connection.prototype.startup = function (config) { - this.stream.write(serialize.startup(config)) -} - -Connection.prototype.cancel = function (processID, secretKey) { - this._send(serialize.cancel(processID, secretKey)) -} - -Connection.prototype.password = function (password) { - this._send(serialize.password(password)) -} - -Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) -} - -Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) -} - -Connection.prototype._send = function (buffer) { - if (!this.stream.writable) { - return false - } - return this.stream.write(buffer) -} - -Connection.prototype.query = function (text) { - this._send(serialize.query(text)) -} - -// send parse message -Connection.prototype.parse = function (query) { - this._send(serialize.parse(query)) -} - -// send bind message -// "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config) { - this._send(serialize.bind(config)) -} - -// send execute message -// "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config) { - this._send(serialize.execute(config)) -} - const flushBuffer = serialize.flush() -Connection.prototype.flush = function () { - if (this.stream.writable) { - this.stream.write(flushBuffer) - } -} - const syncBuffer = serialize.sync() -Connection.prototype.sync = function () { - this._ending = true - this._send(flushBuffer) - this._send(syncBuffer) -} - const endBuffer = serialize.end() -Connection.prototype.end = function () { - // 0x58 = 'X' - this._ending = true - if (!this._connecting || !this.stream.writable) { - this.stream.end() - return +// TODO(bmc) support binary mode at some point +class Connection extends EventEmitter { + constructor(config) { + super() + config = config || {} + this.stream = config.stream || new net.Socket() + this._keepAlive = config.keepAlive + this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis + this.lastBuffer = false + this.parsedStatements = {} + this.ssl = config.ssl || false + this._ending = false + this._emitMessage = false + var self = this + this.on('newListener', function (eventName) { + if (eventName === 'message') { + self._emitMessage = true + } + }) } - return this.stream.write(endBuffer, () => { - this.stream.end() - }) -} -Connection.prototype.close = function (msg) { - this._send(serialize.close(msg)) -} + connect(port, host) { + var self = this -Connection.prototype.describe = function (msg) { - this._send(serialize.describe(msg)) -} + this._connecting = true + this.stream.setNoDelay(true) + this.stream.connect(port, host) -Connection.prototype.sendCopyFromChunk = function (chunk) { - this._send(serialize.copyData(chunk)) -} + this.stream.once('connect', function () { + if (self._keepAlive) { + self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) + } + self.emit('connect') + }) -Connection.prototype.endCopyFrom = function () { - this._send(serialize.copyDone()) -} + const reportStreamError = function (error) { + // errors about disconnections should be ignored during disconnect + if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { + return + } + self.emit('error', error) + } + this.stream.on('error', reportStreamError) -Connection.prototype.sendCopyFail = function (msg) { - this._send(serialize.copyFail(msg)) + this.stream.on('close', function () { + self.emit('end') + }) + + if (!this.ssl) { + return this.attachListeners(this.stream) + } + + this.stream.once('data', function (buffer) { + var responseCode = buffer.toString('utf8') + switch (responseCode) { + case 'S': // Server supports SSL connections, continue with a secure connection + break + case 'N': // Server does not support SSL connections + self.stream.end() + return self.emit('error', new Error('The server does not support SSL connections')) + default: + // Any other response byte, including 'E' (ErrorResponse) indicating a server error + self.stream.end() + return self.emit('error', new Error('There was an error establishing an SSL connection')) + } + var tls = require('tls') + const options = Object.assign( + { + socket: self.stream, + }, + self.ssl + ) + if (net.isIP(host) === 0) { + options.servername = host + } + self.stream = tls.connect(options) + self.attachListeners(self.stream) + self.stream.on('error', reportStreamError) + + self.emit('sslconnect') + }) + } + + attachListeners(stream) { + stream.on('end', () => { + this.emit('end') + }) + parse(stream, (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + if (this._emitMessage) { + this.emit('message', msg) + } + this.emit(eventName, msg) + }) + } + + requestSsl() { + this.stream.write(serialize.requestSsl()) + } + + startup(config) { + this.stream.write(serialize.startup(config)) + } + + cancel(processID, secretKey) { + this._send(serialize.cancel(processID, secretKey)) + } + + password(password) { + this._send(serialize.password(password)) + } + + sendSASLInitialResponseMessage(mechanism, initialResponse) { + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) + } + + sendSCRAMClientFinalMessage(additionalData) { + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) + } + + _send(buffer) { + if (!this.stream.writable) { + return false + } + return this.stream.write(buffer) + } + + query(text) { + this._send(serialize.query(text)) + } + + // send parse message + parse(query) { + this._send(serialize.parse(query)) + } + + // send bind message + bind(config) { + this._send(serialize.bind(config)) + } + + // send execute message + execute(config) { + this._send(serialize.execute(config)) + } + + flush() { + if (this.stream.writable) { + this.stream.write(flushBuffer) + } + } + + sync() { + this._ending = true + this._send(flushBuffer) + this._send(syncBuffer) + } + + end() { + // 0x58 = 'X' + this._ending = true + if (!this._connecting || !this.stream.writable) { + this.stream.end() + return + } + return this.stream.write(endBuffer, () => { + this.stream.end() + }) + } + + close(msg) { + this._send(serialize.close(msg)) + } + + describe(msg) { + this._send(serialize.describe(msg)) + } + + sendCopyFromChunk(chunk) { + this._send(serialize.copyData(chunk)) + } + + endCopyFrom() { + this._send(serialize.copyDone()) + } + + sendCopyFail(msg) { + this._send(serialize.copyFail(msg)) + } } module.exports = Connection diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 2392b710..37098ac8 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -176,30 +176,26 @@ class Query extends EventEmitter { } _getRows(connection, rows) { - connection.execute( - { - portal: this.portal, - rows: rows, - }, - true - ) + connection.execute({ + portal: this.portal, + rows: rows, + }) connection.flush() } + // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY prepare(connection) { // prepared statements need sync to be called after each command // complete or when an error is encountered this.isPreparedStatement = true + // TODO refactor this poor encapsulation if (!this.hasBeenParsed(connection)) { - connection.parse( - { - text: this.text, - name: this.name, - types: this.types, - }, - true - ) + connection.parse({ + text: this.text, + name: this.name, + types: this.types, + }) } if (this.values) { @@ -211,24 +207,17 @@ class Query extends EventEmitter { } } - // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY - connection.bind( - { - portal: this.portal, - statement: this.name, - values: this.values, - binary: this.binary, - }, - true - ) + connection.bind({ + portal: this.portal, + statement: this.name, + values: this.values, + binary: this.binary, + }) - connection.describe( - { - type: 'P', - name: this.portal || '', - }, - true - ) + connection.describe({ + type: 'P', + name: this.portal || '', + }) this._getRows(connection, this.rows) } diff --git a/packages/pg/lib/result.js b/packages/pg/lib/result.js index 233455b0..5e895736 100644 --- a/packages/pg/lib/result.js +++ b/packages/pg/lib/result.js @@ -9,95 +9,170 @@ var types = require('pg-types') +var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ + // result object returned from query // in the 'end' event and also // passed as second argument to provided callback -var Result = function (rowMode, types) { - this.command = null - this.rowCount = null - this.oid = null - this.rows = [] - this.fields = [] - this._parsers = undefined - this._types = types - this.RowCtor = null - this.rowAsArray = rowMode === 'array' - if (this.rowAsArray) { - this.parseRow = this._parseRowAsArray - } -} - -var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ - -// adds a command complete message -Result.prototype.addCommandComplete = function (msg) { - var match - if (msg.text) { - // pure javascript - match = matchRegexp.exec(msg.text) - } else { - // native bindings - match = matchRegexp.exec(msg.command) - } - if (match) { - this.command = match[1] - if (match[3]) { - // COMMMAND OID ROWS - this.oid = parseInt(match[2], 10) - this.rowCount = parseInt(match[3], 10) - } else if (match[2]) { - // COMMAND ROWS - this.rowCount = parseInt(match[2], 10) +class Result { + constructor(rowMode, types) { + this.command = null + this.rowCount = null + this.oid = null + this.rows = [] + this.fields = [] + this._parsers = undefined + this._types = types + this.RowCtor = null + this.rowAsArray = rowMode === 'array' + if (this.rowAsArray) { + this.parseRow = this._parseRowAsArray } } -} -Result.prototype._parseRowAsArray = function (rowData) { - var row = new Array(rowData.length) - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - if (rawValue !== null) { - row[i] = this._parsers[i](rawValue) + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) } else { - row[i] = null + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } } } - return row -} -Result.prototype.parseRow = function (rowData) { - var row = {} - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - var field = this.fields[i].name - if (rawValue !== null) { - row[field] = this._parsers[i](rawValue) - } else { - row[field] = null + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row + } + + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } + } + return row + } + + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } - return row -} -Result.prototype.addRow = function (row) { - this.rows.push(row) -} - -Result.prototype.addFields = function (fieldDescriptions) { - // clears field definitions - // multiple query statements in 1 action can result in multiple sets - // of rowDescriptions...eg: 'select NOW(); select 1::int;' - // you need to reset the fields - this.fields = fieldDescriptions - if (this.fields.length) { - this._parsers = new Array(fieldDescriptions.length) - } - for (var i = 0; i < fieldDescriptions.length; i++) { - var desc = fieldDescriptions[i] - if (this._types) { - this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) } else { - this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } + } + } + + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row + } + + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } + } + return row + } + + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } }