More refactoring

This commit is contained in:
Brian M. Carlson 2020-07-15 11:05:31 -05:00
parent 04e5297d2e
commit 66e1e76c9b
2 changed files with 146 additions and 137 deletions

View File

@ -61,7 +61,7 @@ const run = async () => {
queries = await bench(client, insert, seconds * 1000)
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 5799 qps')
console.log('on my laptop best so far seen 6303 qps')
console.log('')
console.log('Warming up bytea test')

View File

@ -122,94 +122,25 @@ class Client extends EventEmitter {
con.startup(self.getStartupConf())
})
function checkPgPass(cb) {
return function (msg) {
if (typeof self.password === 'function') {
self._Promise
.resolve()
.then(() => self.password())
.then((pass) => {
if (pass !== undefined) {
if (typeof pass !== 'string') {
con.emit('error', new TypeError('Password must be a string'))
return
}
self.connectionParameters.password = self.password = pass
} else {
self.connectionParameters.password = self.password = null
}
cb(msg)
})
.catch((err) => {
con.emit('error', err)
})
} else if (self.password !== null) {
cb(msg)
} else {
pgPass(self.connectionParameters, function (pass) {
if (undefined !== pass) {
self.connectionParameters.password = self.password = pass
}
cb(msg)
})
}
}
}
// password request handling
con.on(
'authenticationCleartextPassword',
checkPgPass(function () {
con.password(self.password)
})
)
con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this))
// password request handling
con.on(
'authenticationMD5Password',
checkPgPass(function (msg) {
con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt))
})
)
con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this))
// password request handling (SASL)
var saslSession
con.on(
'authenticationSASL',
checkPgPass(function (msg) {
saslSession = sasl.startSession(msg.mechanisms)
con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response)
})
)
// password request handling (SASL)
con.on('authenticationSASLContinue', function (msg) {
sasl.continueSession(saslSession, self.password, msg.data)
con.sendSCRAMClientFinalMessage(saslSession.response)
})
// password request handling (SASL)
con.on('authenticationSASLFinal', function (msg) {
sasl.finalizeSession(saslSession, msg.data)
saslSession = null
})
con.once('backendKeyData', function (msg) {
self.processID = msg.processID
self.secretKey = msg.secretKey
})
con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this))
con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this))
con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this))
con.once('backendKeyData', this.handleBackendKeyData.bind(this))
this._connectionCallback = callback
const connectingErrorHandler = (err) => {
if (this._connectionError) {
return
}
this._connectionError = true
clearTimeout(connectionTimeoutHandle)
if (callback) {
return callback(err)
if (this._connectionCallback) {
return this._connectionCallback(err)
}
this.emit('error', err)
}
@ -237,10 +168,9 @@ class Client extends EventEmitter {
// hook up query handling events to connection
// after the connection initially becomes ready for queries
con.once('readyForQuery', function () {
con.once('readyForQuery', () => {
self._connecting = false
self._connected = true
self._attachListeners(con)
con.removeListener('error', connectingErrorHandler)
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
@ -248,24 +178,18 @@ class Client extends EventEmitter {
clearTimeout(connectionTimeoutHandle)
// process possible callback argument to Client#connect
if (callback) {
callback(null, self)
if (this._connectionCallback) {
this._connectionCallback(null, self)
// remove callback for proper error handling
// after the connect event
callback = null
this._connectionCallback = null
}
self.emit('connect')
})
con.on('readyForQuery', function () {
var activeQuery = self.activeQuery
self.activeQuery = null
self.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(con)
}
self._pulseQueryQueue()
})
con.on('readyForQuery', this.handleReadyForQuery.bind(this))
con.on('notice', this.handleNotice.bind(this))
self._attachListeners(con)
con.once('end', () => {
const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly')
@ -279,8 +203,8 @@ class Client extends EventEmitter {
// treat this as an error unless we've already emitted an error
// during connection.
if (this._connecting && !this._connectionError) {
if (callback) {
callback(error)
if (this._connectionCallback) {
this._connectionCallback(error)
} else {
connectedErrorHandler(error)
}
@ -293,10 +217,6 @@ class Client extends EventEmitter {
this.emit('end')
})
})
con.on('notice', function (msg) {
self.emit('notice', msg)
})
}
connect(callback) {
@ -317,49 +237,134 @@ class Client extends EventEmitter {
}
_attachListeners(con) {
const self = this
// delegate rowDescription to active query
con.on('rowDescription', function (msg) {
self.activeQuery.handleRowDescription(msg)
})
// delegate dataRow to active query
con.on('dataRow', function (msg) {
self.activeQuery.handleDataRow(msg)
})
// delegate portalSuspended to active query
// eslint-disable-next-line no-unused-vars
con.on('portalSuspended', function (msg) {
self.activeQuery.handlePortalSuspended(con)
})
// delegate emptyQuery to active query
// eslint-disable-next-line no-unused-vars
con.on('emptyQuery', function (msg) {
self.activeQuery.handleEmptyQuery(con)
})
// delegate commandComplete to active query
con.on('commandComplete', function (msg) {
self.activeQuery.handleCommandComplete(msg, con)
})
// if a prepared statement has a name and properly parses
// we track that its already been executed so we don't parse
// it again on the same client
// eslint-disable-next-line no-unused-vars
con.on('parseComplete', function (msg) {
if (self.activeQuery.name) {
con.parsedStatements[self.activeQuery.name] = self.activeQuery.text
}
})
con.on('rowDescription', this.handleRowDescription.bind(this))
con.on('dataRow', this.handleDataRow.bind(this))
con.on('portalSuspended', this.handlePortalSuspended.bind(this))
con.on('emptyQuery', this.handleEmptyQuery.bind(this))
con.on('commandComplete', this.handleCommandComplete.bind(this))
con.on('parseComplete', this.handleParseComplete.bind(this))
con.on('copyInResponse', this.handleCopyInResponse.bind(this))
con.on('copyData', this.handleCopyData.bind(this))
con.on('notification', this.handleNotification.bind(this))
}
// TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function
// it can be supplied by the user if required - this is a breaking change!
_checkPgPass(cb) {
return function (msg) {
if (typeof this.password === 'function') {
this._Promise
.resolve()
.then(() => this.password())
.then((pass) => {
if (pass !== undefined) {
if (typeof pass !== 'string') {
con.emit('error', new TypeError('Password must be a string'))
return
}
this.connectionParameters.password = this.password = pass
} else {
this.connectionParameters.password = this.password = null
}
cb(msg)
})
.catch((err) => {
con.emit('error', err)
})
} else if (this.password !== null) {
cb(msg)
} else {
pgPass(this.connectionParameters, function (pass) {
if (undefined !== pass) {
this.connectionParameters.password = this.password = pass
}
cb(msg)
})
}
}
}
handleAuthenticationCleartextPassword(msg) {
this._checkPgPass(() => {
this.connection.password(this.password)
})
}
handleAuthenticationMD5Password(msg) {
this._checkPgPass((msg) => {
const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt)
this.connection.password(hashedPassword)
})
}
handleAuthenticationSASL(msg) {
this._checkPgPass((msg) => {
this.saslSession = sasl.startSession(msg.mechanisms)
const con = this.connection
con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response)
})
}
handleAuthenticationSASLContinue(msg) {
const { saslSession } = this
sasl.continueSession(saslSession, self.password, msg.data)
con.sendSCRAMClientFinalMessage(saslSession.response)
}
handleAuthenticationSASLFinal(msg) {
sasl.finalizeSession(this.saslSession, msg.data)
this.saslSession = null
}
handleBackendKeyData(msg) {
this.processID = msg.processID
this.secretKey = msg.secretKey
}
handleReadyForQuery(msg) {
const { activeQuery } = this
this.activeQuery = null
this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
}
this._pulseQueryQueue()
}
handleRowDescription(msg) {
// delegate rowDescription to active query
this.activeQuery.handleRowDescription(msg)
}
handleDataRow(msg) {
// delegate dataRow to active query
this.activeQuery.handleDataRow(msg)
}
handlePortalSuspended(msg) {
// delegate portalSuspended to active query
this.activeQuery.handlePortalSuspended(this.connection)
}
handleEmptyQuery(msg) {
// delegate emptyQuery to active query
this.activeQuery.handleEmptyQuery(this.connection)
}
handleCommandComplete(msg) {
// delegate commandComplete to active query
this.activeQuery.handleCommandComplete(msg, this.connection)
}
handleParseComplete(msg) {
// if a prepared statement has a name and properly parses
// we track that its already been executed so we don't parse
// it again on the same client
if (this.activeQuery.name) {
this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text
}
}
handleCopyInResponse(msg) {
this.activeQuery.handleCopyInResponse(this.connection)
}
@ -372,6 +377,10 @@ class Client extends EventEmitter {
this.emit('notification', msg)
}
handleNotice(msg) {
this.emit('notice', msg)
}
getStartupConf() {
var params = this.connectionParameters