mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Merge pull request #2210 from brianc/bmc/switch-to-fast-connection
Switch internals to use faster connection
This commit is contained in:
commit
5930e4fa38
18
.travis.yml
18
.travis.yml
@ -7,20 +7,18 @@ before_script: |
|
||||
|
||||
env:
|
||||
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres
|
||||
# test w/ new faster parsing code
|
||||
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres PG_FAST_CONNECTION=true
|
||||
|
||||
node_js:
|
||||
- lts/dubnium
|
||||
- lts/erbium
|
||||
# node 13.7 seems to have changed behavior of async iterators exiting early on streams
|
||||
# if 13.8 still has this problem when it comes down I'll talk to the node team about the change
|
||||
# in the mean time...peg to 13.6
|
||||
# in the mean time...peg to 13.6
|
||||
- 13.6
|
||||
- 14
|
||||
|
||||
addons:
|
||||
postgresql: "10"
|
||||
postgresql: '10'
|
||||
|
||||
matrix:
|
||||
include:
|
||||
@ -42,25 +40,25 @@ matrix:
|
||||
|
||||
- node_js: lts/carbon
|
||||
addons:
|
||||
postgresql: "9.5"
|
||||
postgresql: '9.5'
|
||||
dist: precise
|
||||
|
||||
# different PostgreSQL versions on Node LTS
|
||||
- node_js: lts/erbium
|
||||
addons:
|
||||
postgresql: "9.3"
|
||||
postgresql: '9.3'
|
||||
- node_js: lts/erbium
|
||||
addons:
|
||||
postgresql: "9.4"
|
||||
postgresql: '9.4'
|
||||
- node_js: lts/erbium
|
||||
addons:
|
||||
postgresql: "9.5"
|
||||
postgresql: '9.5'
|
||||
- node_js: lts/erbium
|
||||
addons:
|
||||
postgresql: "9.6"
|
||||
postgresql: '9.6'
|
||||
|
||||
# PostgreSQL 9.2 only works on precise
|
||||
- node_js: lts/carbon
|
||||
addons:
|
||||
postgresql: "9.2"
|
||||
postgresql: '9.2'
|
||||
dist: precise
|
||||
|
||||
@ -210,8 +210,21 @@ describe('PgPacketStream', function () {
|
||||
testForMessage(md5PasswordBuffer, expectedMD5PasswordMessage)
|
||||
testForMessage(SASLBuffer, expectedSASLMessage)
|
||||
testForMessage(SASLContinueBuffer, expectedSASLContinueMessage)
|
||||
|
||||
// this exercises a found bug in the parser:
|
||||
// https://github.com/brianc/node-postgres/pull/2210#issuecomment-627626084
|
||||
// and adds a test which is deterministic, rather than relying on network packet chunking
|
||||
const extendedSASLContinueBuffer = Buffer.concat([SASLContinueBuffer, Buffer.from([1, 2, 3, 4])])
|
||||
testForMessage(extendedSASLContinueBuffer, expectedSASLContinueMessage)
|
||||
|
||||
testForMessage(SASLFinalBuffer, expectedSASLFinalMessage)
|
||||
|
||||
// this exercises a found bug in the parser:
|
||||
// https://github.com/brianc/node-postgres/pull/2210#issuecomment-627626084
|
||||
// and adds a test which is deterministic, rather than relying on network packet chunking
|
||||
const extendedSASLFinalBuffer = Buffer.concat([SASLFinalBuffer, Buffer.from([1, 2, 4, 5])])
|
||||
testForMessage(extendedSASLFinalBuffer, expectedSASLFinalMessage)
|
||||
|
||||
testForMessage(paramStatusBuffer, expectedParameterStatusMessage)
|
||||
testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage)
|
||||
testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage)
|
||||
|
||||
@ -296,11 +296,11 @@ export class Parser {
|
||||
break
|
||||
case 11: // AuthenticationSASLContinue
|
||||
message.name = MessageName.authenticationSASLContinue
|
||||
message.data = this.reader.string(length - 4)
|
||||
message.data = this.reader.string(length - 8)
|
||||
break
|
||||
case 12: // AuthenticationSASLFinal
|
||||
message.name = MessageName.authenticationSASLFinal
|
||||
message.data = this.reader.string(length - 4)
|
||||
message.data = this.reader.string(length - 8)
|
||||
break
|
||||
default:
|
||||
throw new Error('Unknown authenticationOk message type ' + code)
|
||||
|
||||
@ -18,9 +18,6 @@ var ConnectionParameters = require('./connection-parameters')
|
||||
var Query = require('./query')
|
||||
var defaults = require('./defaults')
|
||||
var Connection = require('./connection')
|
||||
if (process.env.PG_FAST_CONNECTION) {
|
||||
Connection = require('./connection-fast')
|
||||
}
|
||||
|
||||
var Client = function (config) {
|
||||
EventEmitter.call(this)
|
||||
|
||||
@ -1,214 +0,0 @@
|
||||
'use strict'
|
||||
/**
|
||||
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* README.md file in the root directory of this source tree.
|
||||
*/
|
||||
|
||||
var net = require('net')
|
||||
var EventEmitter = require('events').EventEmitter
|
||||
var util = require('util')
|
||||
|
||||
const { parse, serialize } = require('pg-protocol')
|
||||
|
||||
// TODO(bmc) support binary mode at some point
|
||||
console.log('***using faster connection***')
|
||||
var Connection = function (config) {
|
||||
EventEmitter.call(this)
|
||||
config = config || {}
|
||||
this.stream = config.stream || new net.Socket()
|
||||
this.stream.setNoDelay(true)
|
||||
this._keepAlive = config.keepAlive
|
||||
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
|
||||
this.lastBuffer = false
|
||||
this.parsedStatements = {}
|
||||
this.ssl = config.ssl || false
|
||||
this._ending = false
|
||||
this._emitMessage = false
|
||||
var self = this
|
||||
this.on('newListener', function (eventName) {
|
||||
if (eventName === 'message') {
|
||||
self._emitMessage = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
util.inherits(Connection, EventEmitter)
|
||||
|
||||
Connection.prototype.connect = function (port, host) {
|
||||
var self = this
|
||||
|
||||
this._connecting = true
|
||||
this.stream.connect(port, host)
|
||||
|
||||
this.stream.once('connect', function () {
|
||||
if (self._keepAlive) {
|
||||
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
|
||||
}
|
||||
self.emit('connect')
|
||||
})
|
||||
|
||||
const reportStreamError = function (error) {
|
||||
// errors about disconnections should be ignored during disconnect
|
||||
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
|
||||
return
|
||||
}
|
||||
self.emit('error', error)
|
||||
}
|
||||
this.stream.on('error', reportStreamError)
|
||||
|
||||
this.stream.on('close', function () {
|
||||
self.emit('end')
|
||||
})
|
||||
|
||||
if (!this.ssl) {
|
||||
return this.attachListeners(this.stream)
|
||||
}
|
||||
|
||||
this.stream.once('data', function (buffer) {
|
||||
var responseCode = buffer.toString('utf8')
|
||||
switch (responseCode) {
|
||||
case 'S': // Server supports SSL connections, continue with a secure connection
|
||||
break
|
||||
case 'N': // Server does not support SSL connections
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('The server does not support SSL connections'))
|
||||
default:
|
||||
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('There was an error establishing an SSL connection'))
|
||||
}
|
||||
var tls = require('tls')
|
||||
const options = Object.assign(
|
||||
{
|
||||
socket: self.stream,
|
||||
},
|
||||
self.ssl
|
||||
)
|
||||
if (net.isIP(host) === 0) {
|
||||
options.servername = host
|
||||
}
|
||||
self.stream = tls.connect(options)
|
||||
self.attachListeners(self.stream)
|
||||
self.stream.on('error', reportStreamError)
|
||||
|
||||
self.emit('sslconnect')
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.attachListeners = function (stream) {
|
||||
stream.on('end', () => {
|
||||
this.emit('end')
|
||||
})
|
||||
parse(stream, (msg) => {
|
||||
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
||||
if (this._emitMessage) {
|
||||
this.emit('message', msg)
|
||||
}
|
||||
this.emit(eventName, msg)
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.requestSsl = function () {
|
||||
this.stream.write(serialize.requestSsl())
|
||||
}
|
||||
|
||||
Connection.prototype.startup = function (config) {
|
||||
this.stream.write(serialize.startup(config))
|
||||
}
|
||||
|
||||
Connection.prototype.cancel = function (processID, secretKey) {
|
||||
this._send(serialize.cancel(processID, secretKey))
|
||||
}
|
||||
|
||||
Connection.prototype.password = function (password) {
|
||||
this._send(serialize.password(password))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
|
||||
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
|
||||
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
|
||||
}
|
||||
|
||||
Connection.prototype._send = function (buffer) {
|
||||
if (!this.stream.writable) {
|
||||
return false
|
||||
}
|
||||
return this.stream.write(buffer)
|
||||
}
|
||||
|
||||
Connection.prototype.query = function (text) {
|
||||
this._send(serialize.query(text))
|
||||
}
|
||||
|
||||
// send parse message
|
||||
Connection.prototype.parse = function (query) {
|
||||
this._send(serialize.parse(query))
|
||||
}
|
||||
|
||||
// send bind message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.bind = function (config) {
|
||||
this._send(serialize.bind(config))
|
||||
}
|
||||
|
||||
// send execute message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.execute = function (config) {
|
||||
this._send(serialize.execute(config))
|
||||
}
|
||||
|
||||
const flushBuffer = serialize.flush()
|
||||
Connection.prototype.flush = function () {
|
||||
if (this.stream.writable) {
|
||||
this.stream.write(flushBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
const syncBuffer = serialize.sync()
|
||||
Connection.prototype.sync = function () {
|
||||
this._ending = true
|
||||
this._send(syncBuffer)
|
||||
this._send(flushBuffer)
|
||||
}
|
||||
|
||||
const endBuffer = serialize.end()
|
||||
|
||||
Connection.prototype.end = function () {
|
||||
// 0x58 = 'X'
|
||||
this._ending = true
|
||||
if (!this._connecting || !this.stream.writable) {
|
||||
this.stream.end()
|
||||
return
|
||||
}
|
||||
return this.stream.write(endBuffer, () => {
|
||||
this.stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.close = function (msg) {
|
||||
this._send(serialize.close(msg))
|
||||
}
|
||||
|
||||
Connection.prototype.describe = function (msg) {
|
||||
this._send(serialize.describe(msg))
|
||||
}
|
||||
|
||||
Connection.prototype.sendCopyFromChunk = function (chunk) {
|
||||
this._send(serialize.copyData(chunk))
|
||||
}
|
||||
|
||||
Connection.prototype.endCopyFrom = function () {
|
||||
this._send(serialize.copyDone())
|
||||
}
|
||||
|
||||
Connection.prototype.sendCopyFail = function (msg) {
|
||||
this._send(serialize.copyFail(msg))
|
||||
}
|
||||
|
||||
module.exports = Connection
|
||||
@ -11,11 +11,9 @@ var net = require('net')
|
||||
var EventEmitter = require('events').EventEmitter
|
||||
var util = require('util')
|
||||
|
||||
var Writer = require('buffer-writer')
|
||||
var Reader = require('packet-reader')
|
||||
const { parse, serialize } = require('pg-protocol')
|
||||
|
||||
var TEXT_MODE = 0
|
||||
var BINARY_MODE = 1
|
||||
// TODO(bmc) support binary mode at some point
|
||||
var Connection = function (config) {
|
||||
EventEmitter.call(this)
|
||||
config = config || {}
|
||||
@ -23,20 +21,10 @@ var Connection = function (config) {
|
||||
this._keepAlive = config.keepAlive
|
||||
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
|
||||
this.lastBuffer = false
|
||||
this.lastOffset = 0
|
||||
this.buffer = null
|
||||
this.offset = null
|
||||
this.encoding = config.encoding || 'utf8'
|
||||
this.parsedStatements = {}
|
||||
this.writer = new Writer()
|
||||
this.ssl = config.ssl || false
|
||||
this._ending = false
|
||||
this._mode = TEXT_MODE
|
||||
this._emitMessage = false
|
||||
this._reader = new Reader({
|
||||
headerSize: 1,
|
||||
lengthPadding: -4,
|
||||
})
|
||||
var self = this
|
||||
this.on('newListener', function (eventName) {
|
||||
if (eventName === 'message') {
|
||||
@ -51,6 +39,7 @@ Connection.prototype.connect = function (port, host) {
|
||||
var self = this
|
||||
|
||||
this._connecting = true
|
||||
this.stream.setNoDelay(true)
|
||||
this.stream.connect(port, host)
|
||||
|
||||
this.stream.once('connect', function () {
|
||||
@ -101,576 +90,124 @@ Connection.prototype.connect = function (port, host) {
|
||||
options.servername = host
|
||||
}
|
||||
self.stream = tls.connect(options)
|
||||
self.stream.on('error', reportStreamError)
|
||||
self.attachListeners(self.stream)
|
||||
self.stream.on('error', reportStreamError)
|
||||
|
||||
self.emit('sslconnect')
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.attachListeners = function (stream) {
|
||||
var self = this
|
||||
stream.on('data', function (buff) {
|
||||
self._reader.addChunk(buff)
|
||||
var packet = self._reader.read()
|
||||
while (packet) {
|
||||
var msg = self.parseMessage(packet)
|
||||
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
||||
if (self._emitMessage) {
|
||||
self.emit('message', msg)
|
||||
}
|
||||
self.emit(eventName, msg)
|
||||
packet = self._reader.read()
|
||||
}
|
||||
stream.on('end', () => {
|
||||
this.emit('end')
|
||||
})
|
||||
stream.on('end', function () {
|
||||
self.emit('end')
|
||||
parse(stream, (msg) => {
|
||||
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
||||
if (this._emitMessage) {
|
||||
this.emit('message', msg)
|
||||
}
|
||||
this.emit(eventName, msg)
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.requestSsl = function () {
|
||||
var bodyBuffer = this.writer.addInt16(0x04d2).addInt16(0x162f).flush()
|
||||
|
||||
var length = bodyBuffer.length + 4
|
||||
|
||||
var buffer = new Writer().addInt32(length).add(bodyBuffer).join()
|
||||
this.stream.write(buffer)
|
||||
this.stream.write(serialize.requestSsl())
|
||||
}
|
||||
|
||||
Connection.prototype.startup = function (config) {
|
||||
var writer = this.writer.addInt16(3).addInt16(0)
|
||||
|
||||
Object.keys(config).forEach(function (key) {
|
||||
var val = config[key]
|
||||
writer.addCString(key).addCString(val)
|
||||
})
|
||||
|
||||
writer.addCString('client_encoding').addCString("'utf-8'")
|
||||
|
||||
var bodyBuffer = writer.addCString('').flush()
|
||||
// this message is sent without a code
|
||||
|
||||
var length = bodyBuffer.length + 4
|
||||
|
||||
var buffer = new Writer().addInt32(length).add(bodyBuffer).join()
|
||||
this.stream.write(buffer)
|
||||
this.stream.write(serialize.startup(config))
|
||||
}
|
||||
|
||||
Connection.prototype.cancel = function (processID, secretKey) {
|
||||
var bodyBuffer = this.writer.addInt16(1234).addInt16(5678).addInt32(processID).addInt32(secretKey).flush()
|
||||
|
||||
var length = bodyBuffer.length + 4
|
||||
|
||||
var buffer = new Writer().addInt32(length).add(bodyBuffer).join()
|
||||
this.stream.write(buffer)
|
||||
this._send(serialize.cancel(processID, secretKey))
|
||||
}
|
||||
|
||||
Connection.prototype.password = function (password) {
|
||||
// 0x70 = 'p'
|
||||
this._send(0x70, this.writer.addCString(password))
|
||||
this._send(serialize.password(password))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
|
||||
// 0x70 = 'p'
|
||||
this.writer.addCString(mechanism).addInt32(Buffer.byteLength(initialResponse)).addString(initialResponse)
|
||||
|
||||
this._send(0x70)
|
||||
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
|
||||
// 0x70 = 'p'
|
||||
this.writer.addString(additionalData)
|
||||
|
||||
this._send(0x70)
|
||||
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
|
||||
}
|
||||
|
||||
Connection.prototype._send = function (code, more) {
|
||||
Connection.prototype._send = function (buffer) {
|
||||
if (!this.stream.writable) {
|
||||
return false
|
||||
}
|
||||
if (more === true) {
|
||||
this.writer.addHeader(code)
|
||||
} else {
|
||||
return this.stream.write(this.writer.flush(code))
|
||||
}
|
||||
return this.stream.write(buffer)
|
||||
}
|
||||
|
||||
Connection.prototype.query = function (text) {
|
||||
// 0x51 = Q
|
||||
this.stream.write(this.writer.addCString(text).flush(0x51))
|
||||
this._send(serialize.query(text))
|
||||
}
|
||||
|
||||
// send parse message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.parse = function (query, more) {
|
||||
// expect something like this:
|
||||
// { name: 'queryName',
|
||||
// text: 'select * from blah',
|
||||
// types: ['int8', 'bool'] }
|
||||
|
||||
// normalize missing query names to allow for null
|
||||
query.name = query.name || ''
|
||||
if (query.name.length > 63) {
|
||||
/* eslint-disable no-console */
|
||||
console.error('Warning! Postgres only supports 63 characters for query names.')
|
||||
console.error('You supplied %s (%s)', query.name, query.name.length)
|
||||
console.error('This can cause conflicts and silent errors executing queries')
|
||||
/* eslint-enable no-console */
|
||||
}
|
||||
// normalize null type array
|
||||
query.types = query.types || []
|
||||
var len = query.types.length
|
||||
var buffer = this.writer
|
||||
.addCString(query.name) // name of query
|
||||
.addCString(query.text) // actual query text
|
||||
.addInt16(len)
|
||||
for (var i = 0; i < len; i++) {
|
||||
buffer.addInt32(query.types[i])
|
||||
}
|
||||
|
||||
var code = 0x50
|
||||
this._send(code, more)
|
||||
Connection.prototype.parse = function (query) {
|
||||
this._send(serialize.parse(query))
|
||||
}
|
||||
|
||||
// send bind message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.bind = function (config, more) {
|
||||
// normalize config
|
||||
config = config || {}
|
||||
config.portal = config.portal || ''
|
||||
config.statement = config.statement || ''
|
||||
config.binary = config.binary || false
|
||||
var values = config.values || []
|
||||
var len = values.length
|
||||
var useBinary = false
|
||||
for (var j = 0; j < len; j++) {
|
||||
useBinary |= values[j] instanceof Buffer
|
||||
}
|
||||
var buffer = this.writer.addCString(config.portal).addCString(config.statement)
|
||||
if (!useBinary) {
|
||||
buffer.addInt16(0)
|
||||
} else {
|
||||
buffer.addInt16(len)
|
||||
for (j = 0; j < len; j++) {
|
||||
buffer.addInt16(values[j] instanceof Buffer)
|
||||
}
|
||||
}
|
||||
buffer.addInt16(len)
|
||||
for (var i = 0; i < len; i++) {
|
||||
var val = values[i]
|
||||
if (val === null || typeof val === 'undefined') {
|
||||
buffer.addInt32(-1)
|
||||
} else if (val instanceof Buffer) {
|
||||
buffer.addInt32(val.length)
|
||||
buffer.add(val)
|
||||
} else {
|
||||
buffer.addInt32(Buffer.byteLength(val))
|
||||
buffer.addString(val)
|
||||
}
|
||||
}
|
||||
|
||||
if (config.binary) {
|
||||
buffer.addInt16(1) // format codes to use binary
|
||||
buffer.addInt16(1)
|
||||
} else {
|
||||
buffer.addInt16(0) // format codes to use text
|
||||
}
|
||||
// 0x42 = 'B'
|
||||
this._send(0x42, more)
|
||||
Connection.prototype.bind = function (config) {
|
||||
this._send(serialize.bind(config))
|
||||
}
|
||||
|
||||
// send execute message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.execute = function (config, more) {
|
||||
config = config || {}
|
||||
config.portal = config.portal || ''
|
||||
config.rows = config.rows || ''
|
||||
this.writer.addCString(config.portal).addInt32(config.rows)
|
||||
|
||||
// 0x45 = 'E'
|
||||
this._send(0x45, more)
|
||||
Connection.prototype.execute = function (config) {
|
||||
this._send(serialize.execute(config))
|
||||
}
|
||||
|
||||
var emptyBuffer = Buffer.alloc(0)
|
||||
|
||||
const flushBuffer = serialize.flush()
|
||||
Connection.prototype.flush = function () {
|
||||
// 0x48 = 'H'
|
||||
this.writer.add(emptyBuffer)
|
||||
this._send(0x48)
|
||||
if (this.stream.writable) {
|
||||
this.stream.write(flushBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
const syncBuffer = serialize.sync()
|
||||
Connection.prototype.sync = function () {
|
||||
// clear out any pending data in the writer
|
||||
this.writer.flush(0)
|
||||
|
||||
this.writer.add(emptyBuffer)
|
||||
this._ending = true
|
||||
this._send(0x53)
|
||||
this._send(syncBuffer)
|
||||
this._send(flushBuffer)
|
||||
}
|
||||
|
||||
const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04])
|
||||
const endBuffer = serialize.end()
|
||||
|
||||
Connection.prototype.end = function () {
|
||||
// 0x58 = 'X'
|
||||
this.writer.add(emptyBuffer)
|
||||
this._ending = true
|
||||
if (!this._connecting || !this.stream.writable) {
|
||||
this.stream.end()
|
||||
return
|
||||
}
|
||||
return this.stream.write(END_BUFFER, () => {
|
||||
return this.stream.write(endBuffer, () => {
|
||||
this.stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.close = function (msg, more) {
|
||||
this.writer.addCString(msg.type + (msg.name || ''))
|
||||
this._send(0x43, more)
|
||||
Connection.prototype.close = function (msg) {
|
||||
this._send(serialize.close(msg))
|
||||
}
|
||||
|
||||
Connection.prototype.describe = function (msg, more) {
|
||||
this.writer.addCString(msg.type + (msg.name || ''))
|
||||
this._send(0x44, more)
|
||||
Connection.prototype.describe = function (msg) {
|
||||
this._send(serialize.describe(msg))
|
||||
}
|
||||
|
||||
Connection.prototype.sendCopyFromChunk = function (chunk) {
|
||||
this.stream.write(this.writer.add(chunk).flush(0x64))
|
||||
this._send(serialize.copyData(chunk))
|
||||
}
|
||||
|
||||
Connection.prototype.endCopyFrom = function () {
|
||||
this.stream.write(this.writer.add(emptyBuffer).flush(0x63))
|
||||
this._send(serialize.copyDone())
|
||||
}
|
||||
|
||||
Connection.prototype.sendCopyFail = function (msg) {
|
||||
// this.stream.write(this.writer.add(emptyBuffer).flush(0x66));
|
||||
this.writer.addCString(msg)
|
||||
this._send(0x66)
|
||||
this._send(serialize.copyFail(msg))
|
||||
}
|
||||
|
||||
var Message = function (name, length) {
|
||||
this.name = name
|
||||
this.length = length
|
||||
}
|
||||
|
||||
Connection.prototype.parseMessage = function (buffer) {
|
||||
this.offset = 0
|
||||
var length = buffer.length + 4
|
||||
switch (this._reader.header) {
|
||||
case 0x52: // R
|
||||
return this.parseR(buffer, length)
|
||||
|
||||
case 0x53: // S
|
||||
return this.parseS(buffer, length)
|
||||
|
||||
case 0x4b: // K
|
||||
return this.parseK(buffer, length)
|
||||
|
||||
case 0x43: // C
|
||||
return this.parseC(buffer, length)
|
||||
|
||||
case 0x5a: // Z
|
||||
return this.parseZ(buffer, length)
|
||||
|
||||
case 0x54: // T
|
||||
return this.parseT(buffer, length)
|
||||
|
||||
case 0x44: // D
|
||||
return this.parseD(buffer, length)
|
||||
|
||||
case 0x45: // E
|
||||
return this.parseE(buffer, length)
|
||||
|
||||
case 0x4e: // N
|
||||
return this.parseN(buffer, length)
|
||||
|
||||
case 0x31: // 1
|
||||
return new Message('parseComplete', length)
|
||||
|
||||
case 0x32: // 2
|
||||
return new Message('bindComplete', length)
|
||||
|
||||
case 0x33: // 3
|
||||
return new Message('closeComplete', length)
|
||||
|
||||
case 0x41: // A
|
||||
return this.parseA(buffer, length)
|
||||
|
||||
case 0x6e: // n
|
||||
return new Message('noData', length)
|
||||
|
||||
case 0x49: // I
|
||||
return new Message('emptyQuery', length)
|
||||
|
||||
case 0x73: // s
|
||||
return new Message('portalSuspended', length)
|
||||
|
||||
case 0x47: // G
|
||||
return this.parseG(buffer, length)
|
||||
|
||||
case 0x48: // H
|
||||
return this.parseH(buffer, length)
|
||||
|
||||
case 0x57: // W
|
||||
return new Message('replicationStart', length)
|
||||
|
||||
case 0x63: // c
|
||||
return new Message('copyDone', length)
|
||||
|
||||
case 0x64: // d
|
||||
return this.parsed(buffer, length)
|
||||
}
|
||||
}
|
||||
|
||||
Connection.prototype.parseR = function (buffer, length) {
|
||||
var code = this.parseInt32(buffer)
|
||||
|
||||
var msg = new Message('authenticationOk', length)
|
||||
|
||||
switch (code) {
|
||||
case 0: // AuthenticationOk
|
||||
return msg
|
||||
case 3: // AuthenticationCleartextPassword
|
||||
if (msg.length === 8) {
|
||||
msg.name = 'authenticationCleartextPassword'
|
||||
return msg
|
||||
}
|
||||
break
|
||||
case 5: // AuthenticationMD5Password
|
||||
if (msg.length === 12) {
|
||||
msg.name = 'authenticationMD5Password'
|
||||
msg.salt = Buffer.alloc(4)
|
||||
buffer.copy(msg.salt, 0, this.offset, this.offset + 4)
|
||||
this.offset += 4
|
||||
return msg
|
||||
}
|
||||
|
||||
break
|
||||
case 10: // AuthenticationSASL
|
||||
msg.name = 'authenticationSASL'
|
||||
msg.mechanisms = []
|
||||
do {
|
||||
var mechanism = this.parseCString(buffer)
|
||||
|
||||
if (mechanism) {
|
||||
msg.mechanisms.push(mechanism)
|
||||
}
|
||||
} while (mechanism)
|
||||
|
||||
return msg
|
||||
case 11: // AuthenticationSASLContinue
|
||||
msg.name = 'authenticationSASLContinue'
|
||||
msg.data = this.readString(buffer, length - 4)
|
||||
|
||||
return msg
|
||||
case 12: // AuthenticationSASLFinal
|
||||
msg.name = 'authenticationSASLFinal'
|
||||
msg.data = this.readString(buffer, length - 4)
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
throw new Error('Unknown authenticationOk message type' + util.inspect(msg))
|
||||
}
|
||||
|
||||
Connection.prototype.parseS = function (buffer, length) {
|
||||
var msg = new Message('parameterStatus', length)
|
||||
msg.parameterName = this.parseCString(buffer)
|
||||
msg.parameterValue = this.parseCString(buffer)
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseK = function (buffer, length) {
|
||||
var msg = new Message('backendKeyData', length)
|
||||
msg.processID = this.parseInt32(buffer)
|
||||
msg.secretKey = this.parseInt32(buffer)
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseC = function (buffer, length) {
|
||||
var msg = new Message('commandComplete', length)
|
||||
msg.text = this.parseCString(buffer)
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseZ = function (buffer, length) {
|
||||
var msg = new Message('readyForQuery', length)
|
||||
msg.name = 'readyForQuery'
|
||||
msg.status = this.readString(buffer, 1)
|
||||
return msg
|
||||
}
|
||||
|
||||
var ROW_DESCRIPTION = 'rowDescription'
|
||||
Connection.prototype.parseT = function (buffer, length) {
|
||||
var msg = new Message(ROW_DESCRIPTION, length)
|
||||
msg.fieldCount = this.parseInt16(buffer)
|
||||
var fields = []
|
||||
for (var i = 0; i < msg.fieldCount; i++) {
|
||||
fields.push(this.parseField(buffer))
|
||||
}
|
||||
msg.fields = fields
|
||||
return msg
|
||||
}
|
||||
|
||||
var Field = function () {
|
||||
this.name = null
|
||||
this.tableID = null
|
||||
this.columnID = null
|
||||
this.dataTypeID = null
|
||||
this.dataTypeSize = null
|
||||
this.dataTypeModifier = null
|
||||
this.format = null
|
||||
}
|
||||
|
||||
var FORMAT_TEXT = 'text'
|
||||
var FORMAT_BINARY = 'binary'
|
||||
Connection.prototype.parseField = function (buffer) {
|
||||
var field = new Field()
|
||||
field.name = this.parseCString(buffer)
|
||||
field.tableID = this.parseInt32(buffer)
|
||||
field.columnID = this.parseInt16(buffer)
|
||||
field.dataTypeID = this.parseInt32(buffer)
|
||||
field.dataTypeSize = this.parseInt16(buffer)
|
||||
field.dataTypeModifier = this.parseInt32(buffer)
|
||||
if (this.parseInt16(buffer) === TEXT_MODE) {
|
||||
this._mode = TEXT_MODE
|
||||
field.format = FORMAT_TEXT
|
||||
} else {
|
||||
this._mode = BINARY_MODE
|
||||
field.format = FORMAT_BINARY
|
||||
}
|
||||
return field
|
||||
}
|
||||
|
||||
var DATA_ROW = 'dataRow'
|
||||
var DataRowMessage = function (length, fieldCount) {
|
||||
this.name = DATA_ROW
|
||||
this.length = length
|
||||
this.fieldCount = fieldCount
|
||||
this.fields = []
|
||||
}
|
||||
|
||||
// extremely hot-path code
|
||||
Connection.prototype.parseD = function (buffer, length) {
|
||||
var fieldCount = this.parseInt16(buffer)
|
||||
var msg = new DataRowMessage(length, fieldCount)
|
||||
for (var i = 0; i < fieldCount; i++) {
|
||||
msg.fields.push(this._readValue(buffer))
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
// extremely hot-path code
|
||||
Connection.prototype._readValue = function (buffer) {
|
||||
var length = this.parseInt32(buffer)
|
||||
if (length === -1) return null
|
||||
if (this._mode === TEXT_MODE) {
|
||||
return this.readString(buffer, length)
|
||||
}
|
||||
return this.readBytes(buffer, length)
|
||||
}
|
||||
|
||||
// parses error
|
||||
Connection.prototype.parseE = function (buffer, length, isNotice) {
|
||||
var fields = {}
|
||||
var fieldType = this.readString(buffer, 1)
|
||||
while (fieldType !== '\0') {
|
||||
fields[fieldType] = this.parseCString(buffer)
|
||||
fieldType = this.readString(buffer, 1)
|
||||
}
|
||||
|
||||
// the msg is an Error instance
|
||||
var msg = isNotice ? { message: fields.M } : new Error(fields.M)
|
||||
|
||||
// for compatibility with Message
|
||||
msg.name = isNotice ? 'notice' : 'error'
|
||||
msg.length = length
|
||||
|
||||
msg.severity = fields.S
|
||||
msg.code = fields.C
|
||||
msg.detail = fields.D
|
||||
msg.hint = fields.H
|
||||
msg.position = fields.P
|
||||
msg.internalPosition = fields.p
|
||||
msg.internalQuery = fields.q
|
||||
msg.where = fields.W
|
||||
msg.schema = fields.s
|
||||
msg.table = fields.t
|
||||
msg.column = fields.c
|
||||
msg.dataType = fields.d
|
||||
msg.constraint = fields.n
|
||||
msg.file = fields.F
|
||||
msg.line = fields.L
|
||||
msg.routine = fields.R
|
||||
return msg
|
||||
}
|
||||
|
||||
// same thing, different name
|
||||
Connection.prototype.parseN = function (buffer, length) {
|
||||
var msg = this.parseE(buffer, length, true)
|
||||
msg.name = 'notice'
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseA = function (buffer, length) {
|
||||
var msg = new Message('notification', length)
|
||||
msg.processId = this.parseInt32(buffer)
|
||||
msg.channel = this.parseCString(buffer)
|
||||
msg.payload = this.parseCString(buffer)
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseG = function (buffer, length) {
|
||||
var msg = new Message('copyInResponse', length)
|
||||
return this.parseGH(buffer, msg)
|
||||
}
|
||||
|
||||
Connection.prototype.parseH = function (buffer, length) {
|
||||
var msg = new Message('copyOutResponse', length)
|
||||
return this.parseGH(buffer, msg)
|
||||
}
|
||||
|
||||
Connection.prototype.parseGH = function (buffer, msg) {
|
||||
var isBinary = buffer[this.offset] !== 0
|
||||
this.offset++
|
||||
msg.binary = isBinary
|
||||
var columnCount = this.parseInt16(buffer)
|
||||
msg.columnTypes = []
|
||||
for (var i = 0; i < columnCount; i++) {
|
||||
msg.columnTypes.push(this.parseInt16(buffer))
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parsed = function (buffer, length) {
|
||||
var msg = new Message('copyData', length)
|
||||
msg.chunk = this.readBytes(buffer, msg.length - 4)
|
||||
return msg
|
||||
}
|
||||
|
||||
Connection.prototype.parseInt32 = function (buffer) {
|
||||
var value = buffer.readInt32BE(this.offset)
|
||||
this.offset += 4
|
||||
return value
|
||||
}
|
||||
|
||||
Connection.prototype.parseInt16 = function (buffer) {
|
||||
var value = buffer.readInt16BE(this.offset)
|
||||
this.offset += 2
|
||||
return value
|
||||
}
|
||||
|
||||
Connection.prototype.readString = function (buffer, length) {
|
||||
return buffer.toString(this.encoding, this.offset, (this.offset += length))
|
||||
}
|
||||
|
||||
Connection.prototype.readBytes = function (buffer, length) {
|
||||
return buffer.slice(this.offset, (this.offset += length))
|
||||
}
|
||||
|
||||
Connection.prototype.parseCString = function (buffer) {
|
||||
var start = this.offset
|
||||
var end = buffer.indexOf(0, start)
|
||||
this.offset = end + 1
|
||||
return buffer.toString(this.encoding, start, end)
|
||||
}
|
||||
// end parsing methods
|
||||
module.exports = Connection
|
||||
|
||||
@ -5,6 +5,7 @@ var Client = require(__dirname + '/../../../lib/client')
|
||||
|
||||
test('emits end when not in query', function () {
|
||||
var stream = new (require('events').EventEmitter)()
|
||||
stream.setNoDelay = () => {}
|
||||
stream.connect = function () {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@ -1,205 +0,0 @@
|
||||
'use strict'
|
||||
require(__dirname + '/test-helper')
|
||||
var Connection = require(__dirname + '/../../../lib/connection')
|
||||
var stream = new MemoryStream()
|
||||
var con = new Connection({
|
||||
stream: stream,
|
||||
})
|
||||
con._connecting = true
|
||||
|
||||
assert.received = function (stream, buffer) {
|
||||
assert.lengthIs(stream.packets, 1)
|
||||
var packet = stream.packets.pop()
|
||||
assert.equalBuffers(packet, buffer)
|
||||
}
|
||||
|
||||
test('sends startup message', function () {
|
||||
con.startup({
|
||||
user: 'brian',
|
||||
database: 'bang',
|
||||
})
|
||||
assert.received(
|
||||
stream,
|
||||
new BufferList()
|
||||
.addInt16(3)
|
||||
.addInt16(0)
|
||||
.addCString('user')
|
||||
.addCString('brian')
|
||||
.addCString('database')
|
||||
.addCString('bang')
|
||||
.addCString('client_encoding')
|
||||
.addCString("'utf-8'")
|
||||
.addCString('')
|
||||
.join(true)
|
||||
)
|
||||
})
|
||||
|
||||
test('sends password message', function () {
|
||||
con.password('!')
|
||||
assert.received(stream, new BufferList().addCString('!').join(true, 'p'))
|
||||
})
|
||||
|
||||
test('sends SASLInitialResponseMessage message', function () {
|
||||
con.sendSASLInitialResponseMessage('mech', 'data')
|
||||
assert.received(stream, new BufferList().addCString('mech').addInt32(4).addString('data').join(true, 'p'))
|
||||
})
|
||||
|
||||
test('sends SCRAMClientFinalMessage message', function () {
|
||||
con.sendSCRAMClientFinalMessage('data')
|
||||
assert.received(stream, new BufferList().addString('data').join(true, 'p'))
|
||||
})
|
||||
|
||||
test('sends query message', function () {
|
||||
var txt = 'select * from boom'
|
||||
con.query(txt)
|
||||
assert.received(stream, new BufferList().addCString(txt).join(true, 'Q'))
|
||||
})
|
||||
|
||||
test('sends parse message', function () {
|
||||
con.parse({ text: '!' })
|
||||
var expected = new BufferList().addCString('').addCString('!').addInt16(0).join(true, 'P')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
|
||||
test('sends parse message with named query', function () {
|
||||
con.parse({
|
||||
name: 'boom',
|
||||
text: 'select * from boom',
|
||||
types: [],
|
||||
})
|
||||
var expected = new BufferList().addCString('boom').addCString('select * from boom').addInt16(0).join(true, 'P')
|
||||
assert.received(stream, expected)
|
||||
|
||||
test('with multiple parameters', function () {
|
||||
con.parse({
|
||||
name: 'force',
|
||||
text: 'select * from bang where name = $1',
|
||||
types: [1, 2, 3, 4],
|
||||
})
|
||||
var expected = new BufferList()
|
||||
.addCString('force')
|
||||
.addCString('select * from bang where name = $1')
|
||||
.addInt16(4)
|
||||
.addInt32(1)
|
||||
.addInt32(2)
|
||||
.addInt32(3)
|
||||
.addInt32(4)
|
||||
.join(true, 'P')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
})
|
||||
|
||||
test('bind messages', function () {
|
||||
test('with no values', function () {
|
||||
con.bind()
|
||||
|
||||
var expectedBuffer = new BufferList()
|
||||
.addCString('')
|
||||
.addCString('')
|
||||
.addInt16(0)
|
||||
.addInt16(0)
|
||||
.addInt16(0)
|
||||
.join(true, 'B')
|
||||
assert.received(stream, expectedBuffer)
|
||||
})
|
||||
|
||||
test('with named statement, portal, and values', function () {
|
||||
con.bind({
|
||||
portal: 'bang',
|
||||
statement: 'woo',
|
||||
values: ['1', 'hi', null, 'zing'],
|
||||
})
|
||||
var expectedBuffer = new BufferList()
|
||||
.addCString('bang') // portal name
|
||||
.addCString('woo') // statement name
|
||||
.addInt16(0)
|
||||
.addInt16(4)
|
||||
.addInt32(1)
|
||||
.add(Buffer.from('1'))
|
||||
.addInt32(2)
|
||||
.add(Buffer.from('hi'))
|
||||
.addInt32(-1)
|
||||
.addInt32(4)
|
||||
.add(Buffer.from('zing'))
|
||||
.addInt16(0)
|
||||
.join(true, 'B')
|
||||
assert.received(stream, expectedBuffer)
|
||||
})
|
||||
})
|
||||
|
||||
test('with named statement, portal, and buffer value', function () {
|
||||
con.bind({
|
||||
portal: 'bang',
|
||||
statement: 'woo',
|
||||
values: ['1', 'hi', null, Buffer.from('zing', 'utf8')],
|
||||
})
|
||||
var expectedBuffer = new BufferList()
|
||||
.addCString('bang') // portal name
|
||||
.addCString('woo') // statement name
|
||||
.addInt16(4) // value count
|
||||
.addInt16(0) // string
|
||||
.addInt16(0) // string
|
||||
.addInt16(0) // string
|
||||
.addInt16(1) // binary
|
||||
.addInt16(4)
|
||||
.addInt32(1)
|
||||
.add(Buffer.from('1'))
|
||||
.addInt32(2)
|
||||
.add(Buffer.from('hi'))
|
||||
.addInt32(-1)
|
||||
.addInt32(4)
|
||||
.add(Buffer.from('zing', 'UTF-8'))
|
||||
.addInt16(0)
|
||||
.join(true, 'B')
|
||||
assert.received(stream, expectedBuffer)
|
||||
})
|
||||
|
||||
test('sends execute message', function () {
|
||||
test('for unamed portal with no row limit', function () {
|
||||
con.execute()
|
||||
var expectedBuffer = new BufferList().addCString('').addInt32(0).join(true, 'E')
|
||||
assert.received(stream, expectedBuffer)
|
||||
})
|
||||
|
||||
test('for named portal with row limit', function () {
|
||||
con.execute({
|
||||
portal: 'my favorite portal',
|
||||
rows: 100,
|
||||
})
|
||||
var expectedBuffer = new BufferList().addCString('my favorite portal').addInt32(100).join(true, 'E')
|
||||
assert.received(stream, expectedBuffer)
|
||||
})
|
||||
})
|
||||
|
||||
test('sends flush command', function () {
|
||||
con.flush()
|
||||
var expected = new BufferList().join(true, 'H')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
|
||||
test('sends sync command', function () {
|
||||
con.sync()
|
||||
var expected = new BufferList().join(true, 'S')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
|
||||
test('sends end command', function () {
|
||||
con.end()
|
||||
var expected = Buffer.from([0x58, 0, 0, 0, 4])
|
||||
assert.received(stream, expected)
|
||||
assert.equal(stream.closed, true)
|
||||
})
|
||||
|
||||
test('sends describe command', function () {
|
||||
test('describe statement', function () {
|
||||
con.describe({ type: 'S', name: 'bang' })
|
||||
var expected = new BufferList().addChar('S').addCString('bang').join(true, 'D')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
|
||||
test('describe unnamed portal', function () {
|
||||
con.describe({ type: 'P' })
|
||||
var expected = new BufferList().addChar('P').addCString('').join(true, 'D')
|
||||
assert.received(stream, expected)
|
||||
})
|
||||
})
|
||||
@ -17,6 +17,8 @@ p.connect = function () {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
p.setNoDelay = () => {}
|
||||
|
||||
p.write = function (packet, cb) {
|
||||
this.packets.push(packet)
|
||||
if (cb) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user