Make active query a private prop

This commit is contained in:
Brian Carlson 2025-07-16 13:54:02 -05:00
parent 01fadd93d7
commit d995350e12

View File

@ -42,6 +42,7 @@ class Client extends EventEmitter {
this._connected = false
this._connectionError = false
this._queryable = true
this._activeQuery = null
this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
this.connection =
@ -70,6 +71,20 @@ class Client extends EventEmitter {
this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
}
get activeQuery() {
console.warn('Warning: Client.activeQuery is deprecated and will be removed in a future version.')
return this._activeQuery
}
set activeQuery(val) {
console.warn('Warning: Client.activeQuery is deprecated and will be removed in a future version.')
this._activeQuery = val
}
_getActiveQuery() {
return this._activeQuery
}
_errorAllQueries(err) {
const enqueueError = (query) => {
process.nextTick(() => {
@ -77,9 +92,10 @@ class Client extends EventEmitter {
})
}
if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
const activeQuery = this._getActiveQuery()
if (activeQuery) {
enqueueError(activeQuery)
this._activeQuery = null
}
this.queryQueue.forEach(enqueueError)
@ -314,8 +330,8 @@ class Client extends EventEmitter {
}
this.emit('connect')
}
const { activeQuery } = this
this.activeQuery = null
const activeQuery = this._getActiveQuery()
this._activeQuery = null
this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
@ -355,49 +371,51 @@ class Client extends EventEmitter {
if (this._connecting) {
return this._handleErrorWhileConnecting(msg)
}
const activeQuery = this.activeQuery
const activeQuery = this._getActiveQuery()
if (!activeQuery) {
this._handleErrorEvent(msg)
return
}
this.activeQuery = null
this._activeQuery = null
activeQuery.handleError(msg, this.connection)
}
_handleRowDescription(msg) {
// delegate rowDescription to active query
this.activeQuery.handleRowDescription(msg)
this._getActiveQuery().handleRowDescription(msg)
}
_handleDataRow(msg) {
// delegate dataRow to active query
this.activeQuery.handleDataRow(msg)
this._getActiveQuery().handleDataRow(msg)
}
_handlePortalSuspended(msg) {
// delegate portalSuspended to active query
this.activeQuery.handlePortalSuspended(this.connection)
this._getActiveQuery().handlePortalSuspended(this.connection)
}
_handleEmptyQuery(msg) {
// delegate emptyQuery to active query
this.activeQuery.handleEmptyQuery(this.connection)
this._getActiveQuery().handleEmptyQuery(this.connection)
}
_handleCommandComplete(msg) {
if (this.activeQuery == null) {
const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected commandComplete message from backend.')
this._handleErrorEvent(error)
return
}
// delegate commandComplete to active query
this.activeQuery.handleCommandComplete(msg, this.connection)
activeQuery.handleCommandComplete(msg, this.connection)
}
_handleParseComplete() {
if (this.activeQuery == null) {
const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected parseComplete message from backend.')
this._handleErrorEvent(error)
return
@ -405,17 +423,17 @@ class Client extends EventEmitter {
// if a prepared statement has a name and properly parses
// we track that its already been executed so we don't parse
// it again on the same client
if (this.activeQuery.name) {
this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text
if (activeQuery.name) {
this.connection.parsedStatements[activeQuery.name] = activeQuery.text
}
}
_handleCopyInResponse(msg) {
this.activeQuery.handleCopyInResponse(this.connection)
this._getActiveQuery().handleCopyInResponse(this.connection)
}
_handleCopyData(msg) {
this.activeQuery.handleCopyData(msg, this.connection)
this._getActiveQuery().handleCopyData(msg, this.connection)
}
_handleNotification(msg) {
@ -497,21 +515,22 @@ class Client extends EventEmitter {
_pulseQueryQueue() {
if (this.readyForQuery === true) {
this.activeQuery = this.queryQueue.shift()
if (this.activeQuery) {
this._activeQuery = this.queryQueue.shift()
const activeQuery = this._getActiveQuery()
if (activeQuery) {
this.readyForQuery = false
this.hasExecuted = true
const queryError = this.activeQuery.submit(this.connection)
const queryError = activeQuery.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()
})
}
} else if (this.hasExecuted) {
this.activeQuery = null
this._activeQuery = null
this.emit('drain')
}
}
@ -540,7 +559,7 @@ class Client extends EventEmitter {
result = new this._Promise((resolve, reject) => {
query.callback = (err, res) => (err ? reject(err) : resolve(res))
}).catch((err) => {
// replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the
// replace the stack trace that leads to \`TCP.onStreamRead\` with one that leads back to the
// application that created the query
Error.captureStackTrace(err)
throw err
@ -626,7 +645,7 @@ class Client extends EventEmitter {
}
}
if (this.activeQuery || !this._queryable) {
if (this._getActiveQuery() || !this._queryable) {
// if we have an active query we need to force a disconnect
// on the socket - otherwise a hung query could block end forever
this.connection.stream.destroy()