node-postgres/packages/pg/lib/connection.js
Brian Crowell 684cd09bce
Allow Node to exit if the pool is idle (#2568)
Based on the suggestion from #2078. This adds ref/unref methods to the
Connection and Client classes and then uses them to allow the process to
exit if all of the connections in the pool are idle. This behavior is
controlled by the allowExitOnIdle flag to the Pool constructor; it defaults
to the old behavior.
2021-07-27 11:29:07 -05:00

222 lines
5.0 KiB
JavaScript

'use strict'
var net = require('net')
var EventEmitter = require('events').EventEmitter
const { parse, serialize } = require('pg-protocol')
const flushBuffer = serialize.flush()
const syncBuffer = serialize.sync()
const endBuffer = serialize.end()
// TODO(bmc) support binary mode at some point
class Connection extends EventEmitter {
constructor(config) {
super()
config = config || {}
this.stream = config.stream || new net.Socket()
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.parsedStatements = {}
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
var self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
self._emitMessage = true
}
})
}
connect(port, host) {
var self = this
this._connecting = true
this.stream.setNoDelay(true)
this.stream.connect(port, host)
this.stream.once('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})
const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
}
self.emit('error', error)
}
this.stream.on('error', reportStreamError)
this.stream.on('close', function () {
self.emit('end')
})
if (!this.ssl) {
return this.attachListeners(this.stream)
}
this.stream.once('data', function (buffer) {
var responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'S': // Server supports SSL connections, continue with a secure connection
break
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
const options = {
socket: self.stream,
}
if (self.ssl !== true) {
Object.assign(options, self.ssl)
if ('key' in self.ssl) {
options.key = self.ssl.key
}
}
if (net.isIP(host) === 0) {
options.servername = host
}
try {
self.stream = tls.connect(options)
} catch (err) {
return self.emit('error', err)
}
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)
self.emit('sslconnect')
})
}
attachListeners(stream) {
stream.on('end', () => {
this.emit('end')
})
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (this._emitMessage) {
this.emit('message', msg)
}
this.emit(eventName, msg)
})
}
requestSsl() {
this.stream.write(serialize.requestSsl())
}
startup(config) {
this.stream.write(serialize.startup(config))
}
cancel(processID, secretKey) {
this._send(serialize.cancel(processID, secretKey))
}
password(password) {
this._send(serialize.password(password))
}
sendSASLInitialResponseMessage(mechanism, initialResponse) {
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
}
sendSCRAMClientFinalMessage(additionalData) {
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
}
_send(buffer) {
if (!this.stream.writable) {
return false
}
return this.stream.write(buffer)
}
query(text) {
this._send(serialize.query(text))
}
// send parse message
parse(query) {
this._send(serialize.parse(query))
}
// send bind message
bind(config) {
this._send(serialize.bind(config))
}
// send execute message
execute(config) {
this._send(serialize.execute(config))
}
flush() {
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
sync() {
this._ending = true
this._send(flushBuffer)
this._send(syncBuffer)
}
ref() {
this.stream.ref()
}
unref() {
this.stream.unref()
}
end() {
// 0x58 = 'X'
this._ending = true
if (!this._connecting || !this.stream.writable) {
this.stream.end()
return
}
return this.stream.write(endBuffer, () => {
this.stream.end()
})
}
close(msg) {
this._send(serialize.close(msg))
}
describe(msg) {
this._send(serialize.describe(msg))
}
sendCopyFromChunk(chunk) {
this._send(serialize.copyData(chunk))
}
endCopyFrom() {
this._send(serialize.copyDone())
}
sendCopyFail(msg) {
this._send(serialize.copyFail(msg))
}
}
module.exports = Connection