diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 926fa6bb..7f1356e9 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -86,6 +86,8 @@ class Client extends EventEmitter { _connect(callback) { var self = this var con = this.connection + this._connectionCallback = callback + if (this._connecting || this._connected) { const err = new Error('Client has already been connected. You cannot reuse a client.') process.nextTick(() => { @@ -122,50 +124,7 @@ class Client extends EventEmitter { con.startup(self.getStartupConf()) }) - // password request handling - con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) - // password request handling - con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) - // password request handling (SASL) - con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) - con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) - con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) - con.once('backendKeyData', this.handleBackendKeyData.bind(this)) - - this._connectionCallback = callback - const connectingErrorHandler = this.handleErrorWhileConnecting.bind(this) - - const connectedErrorHandler = this.handleErrorWhileConnected.bind(this) - - const connectedErrorMessageHandler = this.handleErrorMessage.bind(this) - - con.on('error', connectingErrorHandler) - con.on('errorMessage', connectingErrorHandler) - - // hook up query handling events to connection - // after the connection initially becomes ready for queries - con.once('readyForQuery', () => { - self._connecting = false - self._connected = true - con.removeListener('error', connectingErrorHandler) - con.removeListener('errorMessage', connectingErrorHandler) - con.on('error', connectedErrorHandler) - con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(this.connectionTimeoutHandle) - - // process possible callback argument to Client#connect - if (this._connectionCallback) { - this._connectionCallback(null, self) - // remove callback for proper error handling - // after the connect event - this._connectionCallback = null - } - self.emit('connect') - }) - - con.on('readyForQuery', this.handleReadyForQuery.bind(this)) - con.on('notice', this.handleNotice.bind(this)) - self._attachListeners(con) + this._attachListeners(con) con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') @@ -182,10 +141,10 @@ class Client extends EventEmitter { if (this._connectionCallback) { this._connectionCallback(error) } else { - connectedErrorHandler(error) + this.handleErrorWhileConnected(error) } } else if (!this._connectionError) { - connectedErrorHandler(error) + this.handleErrorWhileConnected(error) } } @@ -213,6 +172,19 @@ class Client extends EventEmitter { } _attachListeners(con) { + // password request handling + con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) + // password request handling + con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) + // password request handling (SASL) + con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) + con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) + con.on('backendKeyData', this.handleBackendKeyData.bind(this)) + con.on('error', this.handleErrorWhileConnecting) + con.on('errorMessage', this.handleErrorMessage) + con.on('readyForQuery', this.handleReadyForQuery.bind(this)) + con.on('notice', this.handleNotice.bind(this)) con.on('rowDescription', this.handleRowDescription.bind(this)) con.on('dataRow', this.handleDataRow.bind(this)) con.on('portalSuspended', this.handlePortalSuspended.bind(this)) @@ -283,7 +255,7 @@ class Client extends EventEmitter { handleAuthenticationSASLContinue(msg) { const { saslSession } = this - sasl.continueSession(saslSession, self.password, msg.data) + sasl.continueSession(saslSession, this.password, msg.data) con.sendSCRAMClientFinalMessage(saslSession.response) } @@ -298,6 +270,23 @@ class Client extends EventEmitter { } handleReadyForQuery(msg) { + if (this._connecting) { + this._connecting = false + this._connected = true + const con = this.connection + con.removeListener('error', this.handleErrorWhileConnecting) + con.on('error', this.handleErrorWhileConnected) + clearTimeout(this.connectionTimeoutHandle) + + // process possible callback argument to Client#connect + if (this._connectionCallback) { + this._connectionCallback(null, this) + // remove callback for proper error handling + // after the connect event + this._connectionCallback = null + } + this.emit('connect') + } const { activeQuery } = this this.activeQuery = null this.readyForQuery = true @@ -307,8 +296,8 @@ class Client extends EventEmitter { this._pulseQueryQueue() } - // if we receieve an error during the connection process we handle it here - handleErrorWhileConnecting(err) { + // if we receieve an error event or error message during the connection process we handle it here + handleErrorWhileConnecting = (err) => { if (this._connectionError) { // TODO(bmc): this is swallowing errors - we shouldn't do this return @@ -324,14 +313,17 @@ class Client extends EventEmitter { // if we're connected and we receive an error event from the connection // this means the socket is dead - do a hard abort of all queries and emit // the socket error on the client as well - handleErrorWhileConnected(err) { + handleErrorWhileConnected = (err) => { this._queryable = false this._errorAllQueries(err) this.emit('error', err) } // handle error messages from the postgres backend - handleErrorMessage(msg) { + handleErrorMessage = (msg) => { + if (this._connecting) { + return this.handleErrorWhileConnecting(msg) + } const activeQuery = this.activeQuery if (!activeQuery) {