From 5ba7e3fb48f70ac749aea0d1ffa0cfbd45fec6e2 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:49:54 -0500 Subject: [PATCH] Refactor connection to class --- packages/pg/lib/connection.js | 370 +++++++++++++++++----------------- 1 file changed, 185 insertions(+), 185 deletions(-) diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 65867026..0aa3c096 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -13,201 +13,201 @@ var util = require('util') const { parse, serialize } = require('pg-protocol') -// TODO(bmc) support binary mode at some point -var Connection = function (config) { - EventEmitter.call(this) - config = config || {} - this.stream = config.stream || new net.Socket() - this._keepAlive = config.keepAlive - this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis - this.lastBuffer = false - this.parsedStatements = {} - this.ssl = config.ssl || false - this._ending = false - this._emitMessage = false - var self = this - this.on('newListener', function (eventName) { - if (eventName === 'message') { - self._emitMessage = true - } - }) -} - -util.inherits(Connection, EventEmitter) - -Connection.prototype.connect = function (port, host) { - var self = this - - this._connecting = true - this.stream.setNoDelay(true) - this.stream.connect(port, host) - - this.stream.once('connect', function () { - if (self._keepAlive) { - self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) - } - self.emit('connect') - }) - - const reportStreamError = function (error) { - // errors about disconnections should be ignored during disconnect - if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { - return - } - self.emit('error', error) - } - this.stream.on('error', reportStreamError) - - this.stream.on('close', function () { - self.emit('end') - }) - - if (!this.ssl) { - return this.attachListeners(this.stream) - } - - this.stream.once('data', function (buffer) { - var responseCode = buffer.toString('utf8') - switch (responseCode) { - case 'S': // Server supports SSL connections, continue with a secure connection - break - case 'N': // Server does not support SSL connections - self.stream.end() - return self.emit('error', new Error('The server does not support SSL connections')) - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error - self.stream.end() - return self.emit('error', new Error('There was an error establishing an SSL connection')) - } - var tls = require('tls') - const options = Object.assign( - { - socket: self.stream, - }, - self.ssl - ) - if (net.isIP(host) === 0) { - options.servername = host - } - self.stream = tls.connect(options) - self.attachListeners(self.stream) - self.stream.on('error', reportStreamError) - - self.emit('sslconnect') - }) -} - -Connection.prototype.attachListeners = function (stream) { - stream.on('end', () => { - this.emit('end') - }) - parse(stream, (msg) => { - var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (this._emitMessage) { - this.emit('message', msg) - } - this.emit(eventName, msg) - }) -} - -Connection.prototype.requestSsl = function () { - this.stream.write(serialize.requestSsl()) -} - -Connection.prototype.startup = function (config) { - this.stream.write(serialize.startup(config)) -} - -Connection.prototype.cancel = function (processID, secretKey) { - this._send(serialize.cancel(processID, secretKey)) -} - -Connection.prototype.password = function (password) { - this._send(serialize.password(password)) -} - -Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) -} - -Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) -} - -Connection.prototype._send = function (buffer) { - if (!this.stream.writable) { - return false - } - return this.stream.write(buffer) -} - -Connection.prototype.query = function (text) { - this._send(serialize.query(text)) -} - -// send parse message -Connection.prototype.parse = function (query) { - this._send(serialize.parse(query)) -} - -// send bind message -// "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config) { - this._send(serialize.bind(config)) -} - -// send execute message -// "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config) { - this._send(serialize.execute(config)) -} - const flushBuffer = serialize.flush() -Connection.prototype.flush = function () { - if (this.stream.writable) { - this.stream.write(flushBuffer) - } -} - const syncBuffer = serialize.sync() -Connection.prototype.sync = function () { - this._ending = true - this._send(flushBuffer) - this._send(syncBuffer) -} - const endBuffer = serialize.end() -Connection.prototype.end = function () { - // 0x58 = 'X' - this._ending = true - if (!this._connecting || !this.stream.writable) { - this.stream.end() - return +// TODO(bmc) support binary mode at some point +class Connection extends EventEmitter { + constructor(config) { + super() + config = config || {} + this.stream = config.stream || new net.Socket() + this._keepAlive = config.keepAlive + this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis + this.lastBuffer = false + this.parsedStatements = {} + this.ssl = config.ssl || false + this._ending = false + this._emitMessage = false + var self = this + this.on('newListener', function (eventName) { + if (eventName === 'message') { + self._emitMessage = true + } + }) } - return this.stream.write(endBuffer, () => { - this.stream.end() - }) -} -Connection.prototype.close = function (msg) { - this._send(serialize.close(msg)) -} + connect(port, host) { + var self = this -Connection.prototype.describe = function (msg) { - this._send(serialize.describe(msg)) -} + this._connecting = true + this.stream.setNoDelay(true) + this.stream.connect(port, host) -Connection.prototype.sendCopyFromChunk = function (chunk) { - this._send(serialize.copyData(chunk)) -} + this.stream.once('connect', function () { + if (self._keepAlive) { + self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) + } + self.emit('connect') + }) -Connection.prototype.endCopyFrom = function () { - this._send(serialize.copyDone()) -} + const reportStreamError = function (error) { + // errors about disconnections should be ignored during disconnect + if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { + return + } + self.emit('error', error) + } + this.stream.on('error', reportStreamError) -Connection.prototype.sendCopyFail = function (msg) { - this._send(serialize.copyFail(msg)) + this.stream.on('close', function () { + self.emit('end') + }) + + if (!this.ssl) { + return this.attachListeners(this.stream) + } + + this.stream.once('data', function (buffer) { + var responseCode = buffer.toString('utf8') + switch (responseCode) { + case 'S': // Server supports SSL connections, continue with a secure connection + break + case 'N': // Server does not support SSL connections + self.stream.end() + return self.emit('error', new Error('The server does not support SSL connections')) + default: + // Any other response byte, including 'E' (ErrorResponse) indicating a server error + self.stream.end() + return self.emit('error', new Error('There was an error establishing an SSL connection')) + } + var tls = require('tls') + const options = Object.assign( + { + socket: self.stream, + }, + self.ssl + ) + if (net.isIP(host) === 0) { + options.servername = host + } + self.stream = tls.connect(options) + self.attachListeners(self.stream) + self.stream.on('error', reportStreamError) + + self.emit('sslconnect') + }) + } + + attachListeners(stream) { + stream.on('end', () => { + this.emit('end') + }) + parse(stream, (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + if (this._emitMessage) { + this.emit('message', msg) + } + this.emit(eventName, msg) + }) + } + + requestSsl() { + this.stream.write(serialize.requestSsl()) + } + + startup(config) { + this.stream.write(serialize.startup(config)) + } + + cancel(processID, secretKey) { + this._send(serialize.cancel(processID, secretKey)) + } + + password(password) { + this._send(serialize.password(password)) + } + + sendSASLInitialResponseMessage(mechanism, initialResponse) { + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) + } + + sendSCRAMClientFinalMessage(additionalData) { + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) + } + + _send(buffer) { + if (!this.stream.writable) { + return false + } + return this.stream.write(buffer) + } + + query(text) { + this._send(serialize.query(text)) + } + + // send parse message + parse(query) { + this._send(serialize.parse(query)) + } + + // send bind message + // "more" === true to buffer the message until flush() is called + bind(config) { + this._send(serialize.bind(config)) + } + + // send execute message + // "more" === true to buffer the message until flush() is called + execute(config) { + this._send(serialize.execute(config)) + } + + flush() { + if (this.stream.writable) { + this.stream.write(flushBuffer) + } + } + + sync() { + this._ending = true + this._send(flushBuffer) + this._send(syncBuffer) + } + + end() { + // 0x58 = 'X' + this._ending = true + if (!this._connecting || !this.stream.writable) { + this.stream.end() + return + } + return this.stream.write(endBuffer, () => { + this.stream.end() + }) + } + + close(msg) { + this._send(serialize.close(msg)) + } + + describe(msg) { + this._send(serialize.describe(msg)) + } + + sendCopyFromChunk(chunk) { + this._send(serialize.copyData(chunk)) + } + + endCopyFrom() { + this._send(serialize.copyDone()) + } + + sendCopyFail(msg) { + this._send(serialize.copyFail(msg)) + } } module.exports = Connection