This commit is contained in:
Brian M. Carlson 2020-07-15 11:31:16 -05:00
parent 0b424cfff1
commit 63e15d15fa

View File

@ -86,6 +86,8 @@ class Client extends EventEmitter {
_connect(callback) {
var self = this
var con = this.connection
this._connectionCallback = callback
if (this._connecting || this._connected) {
const err = new Error('Client has already been connected. You cannot reuse a client.')
process.nextTick(() => {
@ -122,50 +124,7 @@ class Client extends EventEmitter {
con.startup(self.getStartupConf())
})
// password request handling
con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this))
// password request handling
con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this))
// password request handling (SASL)
con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this))
con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this))
con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this))
con.once('backendKeyData', this.handleBackendKeyData.bind(this))
this._connectionCallback = callback
const connectingErrorHandler = this.handleErrorWhileConnecting.bind(this)
const connectedErrorHandler = this.handleErrorWhileConnected.bind(this)
const connectedErrorMessageHandler = this.handleErrorMessage.bind(this)
con.on('error', connectingErrorHandler)
con.on('errorMessage', connectingErrorHandler)
// hook up query handling events to connection
// after the connection initially becomes ready for queries
con.once('readyForQuery', () => {
self._connecting = false
self._connected = true
con.removeListener('error', connectingErrorHandler)
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
con.on('errorMessage', connectedErrorMessageHandler)
clearTimeout(this.connectionTimeoutHandle)
// process possible callback argument to Client#connect
if (this._connectionCallback) {
this._connectionCallback(null, self)
// remove callback for proper error handling
// after the connect event
this._connectionCallback = null
}
self.emit('connect')
})
con.on('readyForQuery', this.handleReadyForQuery.bind(this))
con.on('notice', this.handleNotice.bind(this))
self._attachListeners(con)
this._attachListeners(con)
con.once('end', () => {
const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly')
@ -182,10 +141,10 @@ class Client extends EventEmitter {
if (this._connectionCallback) {
this._connectionCallback(error)
} else {
connectedErrorHandler(error)
this.handleErrorWhileConnected(error)
}
} else if (!this._connectionError) {
connectedErrorHandler(error)
this.handleErrorWhileConnected(error)
}
}
@ -213,6 +172,19 @@ class Client extends EventEmitter {
}
_attachListeners(con) {
// password request handling
con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this))
// password request handling
con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this))
// password request handling (SASL)
con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this))
con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this))
con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this))
con.on('backendKeyData', this.handleBackendKeyData.bind(this))
con.on('error', this.handleErrorWhileConnecting)
con.on('errorMessage', this.handleErrorMessage)
con.on('readyForQuery', this.handleReadyForQuery.bind(this))
con.on('notice', this.handleNotice.bind(this))
con.on('rowDescription', this.handleRowDescription.bind(this))
con.on('dataRow', this.handleDataRow.bind(this))
con.on('portalSuspended', this.handlePortalSuspended.bind(this))
@ -283,7 +255,7 @@ class Client extends EventEmitter {
handleAuthenticationSASLContinue(msg) {
const { saslSession } = this
sasl.continueSession(saslSession, self.password, msg.data)
sasl.continueSession(saslSession, this.password, msg.data)
con.sendSCRAMClientFinalMessage(saslSession.response)
}
@ -298,6 +270,23 @@ class Client extends EventEmitter {
}
handleReadyForQuery(msg) {
if (this._connecting) {
this._connecting = false
this._connected = true
const con = this.connection
con.removeListener('error', this.handleErrorWhileConnecting)
con.on('error', this.handleErrorWhileConnected)
clearTimeout(this.connectionTimeoutHandle)
// process possible callback argument to Client#connect
if (this._connectionCallback) {
this._connectionCallback(null, this)
// remove callback for proper error handling
// after the connect event
this._connectionCallback = null
}
this.emit('connect')
}
const { activeQuery } = this
this.activeQuery = null
this.readyForQuery = true
@ -307,8 +296,8 @@ class Client extends EventEmitter {
this._pulseQueryQueue()
}
// if we receieve an error during the connection process we handle it here
handleErrorWhileConnecting(err) {
// if we receieve an error event or error message during the connection process we handle it here
handleErrorWhileConnecting = (err) => {
if (this._connectionError) {
// TODO(bmc): this is swallowing errors - we shouldn't do this
return
@ -324,14 +313,17 @@ class Client extends EventEmitter {
// if we're connected and we receive an error event from the connection
// this means the socket is dead - do a hard abort of all queries and emit
// the socket error on the client as well
handleErrorWhileConnected(err) {
handleErrorWhileConnected = (err) => {
this._queryable = false
this._errorAllQueries(err)
this.emit('error', err)
}
// handle error messages from the postgres backend
handleErrorMessage(msg) {
handleErrorMessage = (msg) => {
if (this._connecting) {
return this.handleErrorWhileConnecting(msg)
}
const activeQuery = this.activeQuery
if (!activeQuery) {