From 66e1e76c9bdc110d9bc42baf71ee6beefd067983 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:05:31 -0500 Subject: [PATCH] More refactoring --- packages/pg/bench.js | 2 +- packages/pg/lib/client.js | 281 ++++++++++++++++++++------------------ 2 files changed, 146 insertions(+), 137 deletions(-) diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 1c1aa641..a668aa85 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -61,7 +61,7 @@ const run = async () => { queries = await bench(client, insert, seconds * 1000) console.log('insert queries:', queries) console.log('qps', queries / seconds) - console.log('on my laptop best so far seen 5799 qps') + console.log('on my laptop best so far seen 6303 qps') console.log('') console.log('Warming up bytea test') diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index fd9ecad1..2dbebe85 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -122,94 +122,25 @@ class Client extends EventEmitter { con.startup(self.getStartupConf()) }) - function checkPgPass(cb) { - return function (msg) { - if (typeof self.password === 'function') { - self._Promise - .resolve() - .then(() => self.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return - } - self.connectionParameters.password = self.password = pass - } else { - self.connectionParameters.password = self.password = null - } - cb(msg) - }) - .catch((err) => { - con.emit('error', err) - }) - } else if (self.password !== null) { - cb(msg) - } else { - pgPass(self.connectionParameters, function (pass) { - if (undefined !== pass) { - self.connectionParameters.password = self.password = pass - } - cb(msg) - }) - } - } - } - // password request handling - con.on( - 'authenticationCleartextPassword', - checkPgPass(function () { - con.password(self.password) - }) - ) - + con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) // password request handling - con.on( - 'authenticationMD5Password', - checkPgPass(function (msg) { - con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) - }) - ) - + con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) // password request handling (SASL) - var saslSession - con.on( - 'authenticationSASL', - checkPgPass(function (msg) { - saslSession = sasl.startSession(msg.mechanisms) - - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) - }) - ) - - // password request handling (SASL) - con.on('authenticationSASLContinue', function (msg) { - sasl.continueSession(saslSession, self.password, msg.data) - - con.sendSCRAMClientFinalMessage(saslSession.response) - }) - - // password request handling (SASL) - con.on('authenticationSASLFinal', function (msg) { - sasl.finalizeSession(saslSession, msg.data) - - saslSession = null - }) - - con.once('backendKeyData', function (msg) { - self.processID = msg.processID - self.secretKey = msg.secretKey - }) + con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) + con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) + con.once('backendKeyData', this.handleBackendKeyData.bind(this)) + this._connectionCallback = callback const connectingErrorHandler = (err) => { if (this._connectionError) { return } this._connectionError = true clearTimeout(connectionTimeoutHandle) - if (callback) { - return callback(err) + if (this._connectionCallback) { + return this._connectionCallback(err) } this.emit('error', err) } @@ -237,10 +168,9 @@ class Client extends EventEmitter { // hook up query handling events to connection // after the connection initially becomes ready for queries - con.once('readyForQuery', function () { + con.once('readyForQuery', () => { self._connecting = false self._connected = true - self._attachListeners(con) con.removeListener('error', connectingErrorHandler) con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) @@ -248,24 +178,18 @@ class Client extends EventEmitter { clearTimeout(connectionTimeoutHandle) // process possible callback argument to Client#connect - if (callback) { - callback(null, self) + if (this._connectionCallback) { + this._connectionCallback(null, self) // remove callback for proper error handling // after the connect event - callback = null + this._connectionCallback = null } self.emit('connect') }) - con.on('readyForQuery', function () { - var activeQuery = self.activeQuery - self.activeQuery = null - self.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(con) - } - self._pulseQueryQueue() - }) + con.on('readyForQuery', this.handleReadyForQuery.bind(this)) + con.on('notice', this.handleNotice.bind(this)) + self._attachListeners(con) con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') @@ -279,8 +203,8 @@ class Client extends EventEmitter { // treat this as an error unless we've already emitted an error // during connection. if (this._connecting && !this._connectionError) { - if (callback) { - callback(error) + if (this._connectionCallback) { + this._connectionCallback(error) } else { connectedErrorHandler(error) } @@ -293,10 +217,6 @@ class Client extends EventEmitter { this.emit('end') }) }) - - con.on('notice', function (msg) { - self.emit('notice', msg) - }) } connect(callback) { @@ -317,49 +237,134 @@ class Client extends EventEmitter { } _attachListeners(con) { - const self = this - // delegate rowDescription to active query - con.on('rowDescription', function (msg) { - self.activeQuery.handleRowDescription(msg) - }) - - // delegate dataRow to active query - con.on('dataRow', function (msg) { - self.activeQuery.handleDataRow(msg) - }) - - // delegate portalSuspended to active query - // eslint-disable-next-line no-unused-vars - con.on('portalSuspended', function (msg) { - self.activeQuery.handlePortalSuspended(con) - }) - - // delegate emptyQuery to active query - // eslint-disable-next-line no-unused-vars - con.on('emptyQuery', function (msg) { - self.activeQuery.handleEmptyQuery(con) - }) - - // delegate commandComplete to active query - con.on('commandComplete', function (msg) { - self.activeQuery.handleCommandComplete(msg, con) - }) - - // if a prepared statement has a name and properly parses - // we track that its already been executed so we don't parse - // it again on the same client - // eslint-disable-next-line no-unused-vars - con.on('parseComplete', function (msg) { - if (self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = self.activeQuery.text - } - }) - + con.on('rowDescription', this.handleRowDescription.bind(this)) + con.on('dataRow', this.handleDataRow.bind(this)) + con.on('portalSuspended', this.handlePortalSuspended.bind(this)) + con.on('emptyQuery', this.handleEmptyQuery.bind(this)) + con.on('commandComplete', this.handleCommandComplete.bind(this)) + con.on('parseComplete', this.handleParseComplete.bind(this)) con.on('copyInResponse', this.handleCopyInResponse.bind(this)) con.on('copyData', this.handleCopyData.bind(this)) con.on('notification', this.handleNotification.bind(this)) } + // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function + // it can be supplied by the user if required - this is a breaking change! + _checkPgPass(cb) { + return function (msg) { + if (typeof this.password === 'function') { + this._Promise + .resolve() + .then(() => this.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return + } + this.connectionParameters.password = this.password = pass + } else { + this.connectionParameters.password = this.password = null + } + cb(msg) + }) + .catch((err) => { + con.emit('error', err) + }) + } else if (this.password !== null) { + cb(msg) + } else { + pgPass(this.connectionParameters, function (pass) { + if (undefined !== pass) { + this.connectionParameters.password = this.password = pass + } + cb(msg) + }) + } + } + } + + handleAuthenticationCleartextPassword(msg) { + this._checkPgPass(() => { + this.connection.password(this.password) + }) + } + + handleAuthenticationMD5Password(msg) { + this._checkPgPass((msg) => { + const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt) + this.connection.password(hashedPassword) + }) + } + + handleAuthenticationSASL(msg) { + this._checkPgPass((msg) => { + this.saslSession = sasl.startSession(msg.mechanisms) + const con = this.connection + con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) + }) + } + + handleAuthenticationSASLContinue(msg) { + const { saslSession } = this + sasl.continueSession(saslSession, self.password, msg.data) + con.sendSCRAMClientFinalMessage(saslSession.response) + } + + handleAuthenticationSASLFinal(msg) { + sasl.finalizeSession(this.saslSession, msg.data) + this.saslSession = null + } + + handleBackendKeyData(msg) { + this.processID = msg.processID + this.secretKey = msg.secretKey + } + + handleReadyForQuery(msg) { + const { activeQuery } = this + this.activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } + this._pulseQueryQueue() + } + + handleRowDescription(msg) { + // delegate rowDescription to active query + this.activeQuery.handleRowDescription(msg) + } + + handleDataRow(msg) { + // delegate dataRow to active query + this.activeQuery.handleDataRow(msg) + } + + handlePortalSuspended(msg) { + // delegate portalSuspended to active query + this.activeQuery.handlePortalSuspended(this.connection) + } + + handleEmptyQuery(msg) { + // delegate emptyQuery to active query + this.activeQuery.handleEmptyQuery(this.connection) + } + + handleCommandComplete(msg) { + // delegate commandComplete to active query + this.activeQuery.handleCommandComplete(msg, this.connection) + } + + handleParseComplete(msg) { + // if a prepared statement has a name and properly parses + // we track that its already been executed so we don't parse + // it again on the same client + if (this.activeQuery.name) { + this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text + } + } + handleCopyInResponse(msg) { this.activeQuery.handleCopyInResponse(this.connection) } @@ -372,6 +377,10 @@ class Client extends EventEmitter { this.emit('notification', msg) } + handleNotice(msg) { + this.emit('notice', msg) + } + getStartupConf() { var params = this.connectionParameters