Refactor connection to class

This commit is contained in:
Brian M. Carlson 2020-07-15 11:49:54 -05:00
parent 9d1dce9c5d
commit 5ba7e3fb48

View File

@ -13,201 +13,201 @@ var util = require('util')
const { parse, serialize } = require('pg-protocol')
// TODO(bmc) support binary mode at some point
var Connection = function (config) {
EventEmitter.call(this)
config = config || {}
this.stream = config.stream || new net.Socket()
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.parsedStatements = {}
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
var self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
self._emitMessage = true
}
})
}
util.inherits(Connection, EventEmitter)
Connection.prototype.connect = function (port, host) {
var self = this
this._connecting = true
this.stream.setNoDelay(true)
this.stream.connect(port, host)
this.stream.once('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})
const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
}
self.emit('error', error)
}
this.stream.on('error', reportStreamError)
this.stream.on('close', function () {
self.emit('end')
})
if (!this.ssl) {
return this.attachListeners(this.stream)
}
this.stream.once('data', function (buffer) {
var responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'S': // Server supports SSL connections, continue with a secure connection
break
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
const options = Object.assign(
{
socket: self.stream,
},
self.ssl
)
if (net.isIP(host) === 0) {
options.servername = host
}
self.stream = tls.connect(options)
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)
self.emit('sslconnect')
})
}
Connection.prototype.attachListeners = function (stream) {
stream.on('end', () => {
this.emit('end')
})
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (this._emitMessage) {
this.emit('message', msg)
}
this.emit(eventName, msg)
})
}
Connection.prototype.requestSsl = function () {
this.stream.write(serialize.requestSsl())
}
Connection.prototype.startup = function (config) {
this.stream.write(serialize.startup(config))
}
Connection.prototype.cancel = function (processID, secretKey) {
this._send(serialize.cancel(processID, secretKey))
}
Connection.prototype.password = function (password) {
this._send(serialize.password(password))
}
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
}
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
}
Connection.prototype._send = function (buffer) {
if (!this.stream.writable) {
return false
}
return this.stream.write(buffer)
}
Connection.prototype.query = function (text) {
this._send(serialize.query(text))
}
// send parse message
Connection.prototype.parse = function (query) {
this._send(serialize.parse(query))
}
// send bind message
// "more" === true to buffer the message until flush() is called
Connection.prototype.bind = function (config) {
this._send(serialize.bind(config))
}
// send execute message
// "more" === true to buffer the message until flush() is called
Connection.prototype.execute = function (config) {
this._send(serialize.execute(config))
}
const flushBuffer = serialize.flush()
Connection.prototype.flush = function () {
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
const syncBuffer = serialize.sync()
Connection.prototype.sync = function () {
this._ending = true
this._send(flushBuffer)
this._send(syncBuffer)
}
const endBuffer = serialize.end()
Connection.prototype.end = function () {
// 0x58 = 'X'
this._ending = true
if (!this._connecting || !this.stream.writable) {
this.stream.end()
return
// TODO(bmc) support binary mode at some point
class Connection extends EventEmitter {
constructor(config) {
super()
config = config || {}
this.stream = config.stream || new net.Socket()
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.parsedStatements = {}
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
var self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
self._emitMessage = true
}
})
}
return this.stream.write(endBuffer, () => {
this.stream.end()
})
}
Connection.prototype.close = function (msg) {
this._send(serialize.close(msg))
}
connect(port, host) {
var self = this
Connection.prototype.describe = function (msg) {
this._send(serialize.describe(msg))
}
this._connecting = true
this.stream.setNoDelay(true)
this.stream.connect(port, host)
Connection.prototype.sendCopyFromChunk = function (chunk) {
this._send(serialize.copyData(chunk))
}
this.stream.once('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})
Connection.prototype.endCopyFrom = function () {
this._send(serialize.copyDone())
}
const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
}
self.emit('error', error)
}
this.stream.on('error', reportStreamError)
Connection.prototype.sendCopyFail = function (msg) {
this._send(serialize.copyFail(msg))
this.stream.on('close', function () {
self.emit('end')
})
if (!this.ssl) {
return this.attachListeners(this.stream)
}
this.stream.once('data', function (buffer) {
var responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'S': // Server supports SSL connections, continue with a secure connection
break
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
const options = Object.assign(
{
socket: self.stream,
},
self.ssl
)
if (net.isIP(host) === 0) {
options.servername = host
}
self.stream = tls.connect(options)
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)
self.emit('sslconnect')
})
}
attachListeners(stream) {
stream.on('end', () => {
this.emit('end')
})
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (this._emitMessage) {
this.emit('message', msg)
}
this.emit(eventName, msg)
})
}
requestSsl() {
this.stream.write(serialize.requestSsl())
}
startup(config) {
this.stream.write(serialize.startup(config))
}
cancel(processID, secretKey) {
this._send(serialize.cancel(processID, secretKey))
}
password(password) {
this._send(serialize.password(password))
}
sendSASLInitialResponseMessage(mechanism, initialResponse) {
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
}
sendSCRAMClientFinalMessage(additionalData) {
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
}
_send(buffer) {
if (!this.stream.writable) {
return false
}
return this.stream.write(buffer)
}
query(text) {
this._send(serialize.query(text))
}
// send parse message
parse(query) {
this._send(serialize.parse(query))
}
// send bind message
// "more" === true to buffer the message until flush() is called
bind(config) {
this._send(serialize.bind(config))
}
// send execute message
// "more" === true to buffer the message until flush() is called
execute(config) {
this._send(serialize.execute(config))
}
flush() {
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
sync() {
this._ending = true
this._send(flushBuffer)
this._send(syncBuffer)
}
end() {
// 0x58 = 'X'
this._ending = true
if (!this._connecting || !this.stream.writable) {
this.stream.end()
return
}
return this.stream.write(endBuffer, () => {
this.stream.end()
})
}
close(msg) {
this._send(serialize.close(msg))
}
describe(msg) {
this._send(serialize.describe(msg))
}
sendCopyFromChunk(chunk) {
this._send(serialize.copyData(chunk))
}
endCopyFrom() {
this._send(serialize.copyDone())
}
sendCopyFail(msg) {
this._send(serialize.copyFail(msg))
}
}
module.exports = Connection