Decouple serializing messages w/ writing them to socket (#2155)

* Move message writing to typescript lib

* Write more tests, cleanup code to some extent

* Rename package to something more representing its name

* Remove unused code

* Small tweaks based on microbenchmarks

* Rename w/o underscore
This commit is contained in:
Brian C 2020-04-09 12:28:19 -05:00 committed by GitHub
parent 2013d77b28
commit 3ff91eaa32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 685 additions and 188 deletions

View File

@ -11,7 +11,7 @@
],
"scripts": {
"test": "yarn lerna exec yarn test",
"build": "yarn lerna exec --scope pg-packet-stream yarn build",
"build": "yarn lerna exec --scope pg-protocol yarn build",
"pretest": "yarn build",
"lint": "yarn lerna exec --parallel yarn lint"
},

View File

@ -1,6 +1,7 @@
{
"name": "pg-packet-stream",
"name": "pg-protocol",
"version": "1.1.0",
"description": "The postgres client/server binary protocol, implemented in TypeScript",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",

View File

@ -0,0 +1,24 @@
// file for microbenchmarking
import { Writer } from './buffer-writer'
import { serialize } from './index'
const LOOPS = 1000
let count = 0
let start = Date.now()
const writer = new Writer()
const run = () => {
if (count > LOOPS) {
console.log(Date.now() - start)
return;
}
count++
for(let i = 0; i < LOOPS; i++) {
serialize.describe({ type: 'P'})
serialize.describe({ type: 'S'})
}
setImmediate(run)
}
run()

View File

@ -2,8 +2,10 @@ const emptyBuffer = Buffer.allocUnsafe(0);
export class BufferReader {
private buffer: Buffer = emptyBuffer;
// TODO(bmc): support non-utf8 encoding
// TODO(bmc): support non-utf8 encoding?
private encoding: string = 'utf-8';
constructor(private offset: number = 0) {
}
public setBuffer(offset: number, buffer: Buffer): void {

View File

@ -0,0 +1,87 @@
//binary data writer tuned for encoding binary specific to the postgres binary protocol
export class Writer {
private buffer: Buffer;
private offset: number = 5;
private headerPosition: number = 0;
constructor(private size = 256) {
this.buffer = Buffer.alloc(size)
}
private ensure(size: number): void {
var remaining = this.buffer.length - this.offset;
if (remaining < size) {
var oldBuffer = this.buffer;
// exponential growth factor of around ~ 1.5
// https://stackoverflow.com/questions/2269063/buffer-growth-strategy
var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size;
this.buffer = Buffer.alloc(newSize);
oldBuffer.copy(this.buffer);
}
}
public addInt32(num: number): Writer {
this.ensure(4);
this.buffer[this.offset++] = (num >>> 24 & 0xFF);
this.buffer[this.offset++] = (num >>> 16 & 0xFF);
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
return this;
}
public addInt16(num: number): Writer {
this.ensure(2);
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
return this;
}
public addCString(string: string): Writer {
if (!string) {
this.ensure(1);
} else {
var len = Buffer.byteLength(string);
this.ensure(len + 1); // +1 for null terminator
this.buffer.write(string, this.offset, 'utf-8')
this.offset += len;
}
this.buffer[this.offset++] = 0; // null terminator
return this;
}
public addString(string: string = ""): Writer {
var len = Buffer.byteLength(string);
this.ensure(len);
this.buffer.write(string, this.offset);
this.offset += len;
return this;
}
public add(otherBuffer: Buffer): Writer {
this.ensure(otherBuffer.length);
otherBuffer.copy(this.buffer, this.offset);
this.offset += otherBuffer.length;
return this;
}
private join(code?: number): Buffer {
if (code) {
this.buffer[this.headerPosition] = code;
//length is everything in this packet minus the code
const length = this.offset - (this.headerPosition + 1)
this.buffer.writeInt32BE(length, this.headerPosition + 1)
}
return this.buffer.slice(code ? 0 : 5, this.offset);
}
public flush(code?: number): Buffer {
var result = this.join(code);
this.offset = 5;
this.headerPosition = 0;
this.buffer = Buffer.allocUnsafe(this.size)
return result;
}
}

View File

@ -1,6 +1,6 @@
import buffers from './testing/test-buffers'
import BufferList from './testing/buffer-list'
import { parse } from './'
import { parse } from '.'
import assert from 'assert'
import { PassThrough } from 'stream'
import { BackendMessage } from './messages'

View File

@ -0,0 +1,11 @@
import { BackendMessage } from './messages';
import { serialize } from './serializer';
import { Parser, MessageCallback } from './parser'
export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
const parser = new Parser()
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
return new Promise((resolve) => stream.on('end', () => resolve()))
}
export { serialize };

View File

@ -0,0 +1,256 @@
import assert from 'assert'
import { serialize } from './serializer'
import BufferList from './testing/buffer-list'
describe('serializer', () => {
it('builds startup message', function () {
const actual = serialize.startup({
user: 'brian',
database: 'bang'
})
assert.deepEqual(actual, new BufferList()
.addInt16(3)
.addInt16(0)
.addCString('user')
.addCString('brian')
.addCString('database')
.addCString('bang')
.addCString('client_encoding')
.addCString("'utf-8'")
.addCString('').join(true))
})
it('builds password message', function () {
const actual = serialize.password('!')
assert.deepEqual(actual, new BufferList().addCString('!').join(true, 'p'))
})
it('builds request ssl message', function () {
const actual = serialize.requestSsl()
const expected = new BufferList().addInt32(80877103).join(true)
assert.deepEqual(actual, expected);
})
it('builds SASLInitialResponseMessage message', function () {
const actual = serialize.sendSASLInitialResponseMessage('mech', 'data')
assert.deepEqual(actual, new BufferList().addCString('mech').addInt32(4).addString('data').join(true, 'p'))
})
it('builds SCRAMClientFinalMessage message', function () {
const actual = serialize.sendSCRAMClientFinalMessage('data')
assert.deepEqual(actual, new BufferList().addString('data').join(true, 'p'))
})
it('builds query message', function () {
var txt = 'select * from boom'
const actual = serialize.query(txt)
assert.deepEqual(actual, new BufferList().addCString(txt).join(true, 'Q'))
})
describe('parse message', () => {
it('builds parse message', function () {
const actual = serialize.parse({ text: '!' })
var expected = new BufferList()
.addCString('')
.addCString('!')
.addInt16(0).join(true, 'P')
assert.deepEqual(actual, expected)
})
it('builds parse message with named query', function () {
const actual = serialize.parse({
name: 'boom',
text: 'select * from boom',
types: []
})
var expected = new BufferList()
.addCString('boom')
.addCString('select * from boom')
.addInt16(0).join(true, 'P')
assert.deepEqual(actual, expected)
})
it('with multiple parameters', function () {
const actual = serialize.parse({
name: 'force',
text: 'select * from bang where name = $1',
types: [1, 2, 3, 4]
})
var expected = new BufferList()
.addCString('force')
.addCString('select * from bang where name = $1')
.addInt16(4)
.addInt32(1)
.addInt32(2)
.addInt32(3)
.addInt32(4).join(true, 'P')
assert.deepEqual(actual, expected)
})
})
describe('bind messages', function () {
it('with no values', function () {
const actual = serialize.bind()
var expectedBuffer = new BufferList()
.addCString('')
.addCString('')
.addInt16(0)
.addInt16(0)
.addInt16(0)
.join(true, 'B')
assert.deepEqual(actual, expectedBuffer)
})
it('with named statement, portal, and values', function () {
const actual = serialize.bind({
portal: 'bang',
statement: 'woo',
values: ['1', 'hi', null, 'zing']
})
var expectedBuffer = new BufferList()
.addCString('bang') // portal name
.addCString('woo') // statement name
.addInt16(0)
.addInt16(4)
.addInt32(1)
.add(Buffer.from('1'))
.addInt32(2)
.add(Buffer.from('hi'))
.addInt32(-1)
.addInt32(4)
.add(Buffer.from('zing'))
.addInt16(0)
.join(true, 'B')
assert.deepEqual(actual, expectedBuffer)
})
})
it('with named statement, portal, and buffer value', function () {
const actual = serialize.bind({
portal: 'bang',
statement: 'woo',
values: ['1', 'hi', null, Buffer.from('zing', 'utf8')]
})
var expectedBuffer = new BufferList()
.addCString('bang') // portal name
.addCString('woo') // statement name
.addInt16(4)// value count
.addInt16(0)// string
.addInt16(0)// string
.addInt16(0)// string
.addInt16(1)// binary
.addInt16(4)
.addInt32(1)
.add(Buffer.from('1'))
.addInt32(2)
.add(Buffer.from('hi'))
.addInt32(-1)
.addInt32(4)
.add(Buffer.from('zing', 'utf-8'))
.addInt16(0)
.join(true, 'B')
assert.deepEqual(actual, expectedBuffer)
})
describe('builds execute message', function () {
it('for unamed portal with no row limit', function () {
const actual = serialize.execute()
var expectedBuffer = new BufferList()
.addCString('')
.addInt32(0)
.join(true, 'E')
assert.deepEqual(actual, expectedBuffer)
})
it('for named portal with row limit', function () {
const actual = serialize.execute({
portal: 'my favorite portal',
rows: 100
})
var expectedBuffer = new BufferList()
.addCString('my favorite portal')
.addInt32(100)
.join(true, 'E')
assert.deepEqual(actual, expectedBuffer)
})
})
it('builds flush command', function () {
const actual = serialize.flush()
var expected = new BufferList().join(true, 'H')
assert.deepEqual(actual, expected)
})
it('builds sync command', function () {
const actual = serialize.sync()
var expected = new BufferList().join(true, 'S')
assert.deepEqual(actual, expected)
})
it('builds end command', function () {
const actual = serialize.end()
var expected = Buffer.from([0x58, 0, 0, 0, 4])
assert.deepEqual(actual, expected)
})
describe('builds describe command', function () {
it('describe statement', function () {
const actual = serialize.describe({ type: 'S', name: 'bang' })
var expected = new BufferList().addChar('S').addCString('bang').join(true, 'D')
assert.deepEqual(actual, expected)
})
it('describe unnamed portal', function () {
const actual = serialize.describe({ type: 'P' })
var expected = new BufferList().addChar('P').addCString('').join(true, 'D')
assert.deepEqual(actual, expected)
})
})
describe('builds close command', function () {
it('describe statement', function () {
const actual = serialize.close({ type: 'S', name: 'bang' })
var expected = new BufferList().addChar('S').addCString('bang').join(true, 'C')
assert.deepEqual(actual, expected)
})
it('describe unnamed portal', function () {
const actual = serialize.close({ type: 'P' })
var expected = new BufferList().addChar('P').addCString('').join(true, 'C')
assert.deepEqual(actual, expected)
})
})
describe('copy messages', function () {
it('builds copyFromChunk', () => {
const actual = serialize.copyData(Buffer.from([1, 2, 3]))
const expected = new BufferList().add(Buffer.from([1, 2,3 ])).join(true, 'd')
assert.deepEqual(actual, expected)
})
it('builds copy fail', () => {
const actual = serialize.copyFail('err!')
const expected = new BufferList().addCString('err!').join(true, 'f')
assert.deepEqual(actual, expected)
})
it('builds copy done', () => {
const actual = serialize.copyDone()
const expected = new BufferList().join(true, 'c')
assert.deepEqual(actual, expected)
})
})
it('builds cancel message', () => {
const actual = serialize.cancel(3, 4)
const expected = new BufferList().addInt16(1234).addInt16(5678).addInt32(3).addInt32(4).join(true)
assert.deepEqual(actual, expected)
})
})

View File

@ -1,6 +1,6 @@
import { TransformOptions } from 'stream';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages';
import { BufferReader } from './BufferReader';
import { BufferReader } from './buffer-reader';
import assert from 'assert'
// every message is prefixed with a single bye
@ -46,15 +46,9 @@ const enum MessageCodes {
CopyData = 0x64, // d
}
type MessageCallback = (msg: BackendMessage) => void;
export type MessageCallback = (msg: BackendMessage) => void;
export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
const parser = new PgPacketParser()
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
return new Promise((resolve) => stream.on('end', () => resolve()))
}
class PgPacketParser {
export class Parser {
private remainingBuffer: Buffer = emptyBuffer;
private reader = new BufferReader();
private mode: Mode;

View File

@ -0,0 +1,272 @@
import { Writer } from './buffer-writer'
const enum code {
startup = 0x70,
query = 0x51,
parse = 0x50,
bind = 0x42,
execute = 0x45,
flush = 0x48,
sync = 0x53,
end = 0x58,
close = 0x43,
describe = 0x44,
copyFromChunk = 0x64,
copyDone = 0x63,
copyFail = 0x66
}
const writer = new Writer()
const startup = (opts: Record<string, string>): Buffer => {
// protocol version
writer.addInt16(3).addInt16(0)
for (const key of Object.keys(opts)) {
writer.addCString(key).addCString(opts[key])
}
writer.addCString('client_encoding').addCString("'utf-8'")
var bodyBuffer = writer.addCString('').flush()
// this message is sent without a code
var length = bodyBuffer.length + 4
return new Writer()
.addInt32(length)
.add(bodyBuffer)
.flush()
}
const requestSsl = (): Buffer => {
const response = Buffer.allocUnsafe(8)
response.writeInt32BE(8, 0);
response.writeInt32BE(80877103, 4)
return response
}
const password = (password: string): Buffer => {
return writer.addCString(password).flush(code.startup)
}
const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer {
// 0x70 = 'p'
writer
.addCString(mechanism)
.addInt32(Buffer.byteLength(initialResponse))
.addString(initialResponse)
return writer.flush(code.startup)
}
const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer {
return writer.addString(additionalData).flush(code.startup)
}
const query = (text: string): Buffer => {
return writer.addCString(text).flush(code.query)
}
type ParseOpts = {
name?: string;
types?: number[];
text: string;
}
const emptyArray: any[] = []
const parse = (query: ParseOpts): Buffer => {
// expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
// types: ['int8', 'bool'] }
// normalize missing query names to allow for null
const name = query.name || ''
if (name.length > 63) {
/* eslint-disable no-console */
console.error('Warning! Postgres only supports 63 characters for query names.')
console.error('You supplied %s (%s)', name, name.length)
console.error('This can cause conflicts and silent errors executing queries')
/* eslint-enable no-console */
}
const types = query.types || emptyArray
var len = types.length
var buffer = writer
.addCString(name) // name of query
.addCString(query.text) // actual query text
.addInt16(len)
for (var i = 0; i < len; i++) {
buffer.addInt32(types[i])
}
return writer.flush(code.parse)
}
type BindOpts = {
portal?: string;
binary?: boolean;
statement?: string;
values?: any[];
}
const bind = (config: BindOpts = {}): Buffer => {
// normalize config
const portal = config.portal || ''
const statement = config.statement || ''
const binary = config.binary || false
var values = config.values || emptyArray
var len = values.length
var useBinary = false
// TODO(bmc): all the loops in here aren't nice, we can do better
for (var j = 0; j < len; j++) {
useBinary = useBinary || values[j] instanceof Buffer
}
var buffer = writer
.addCString(portal)
.addCString(statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer ? 1 : 0)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
var val = values[i]
if (val === null || typeof val === 'undefined') {
buffer.addInt32(-1)
} else if (val instanceof Buffer) {
buffer.addInt32(val.length)
buffer.add(val)
} else {
buffer.addInt32(Buffer.byteLength(val))
buffer.addString(val)
}
}
if (binary) {
buffer.addInt16(1) // format codes to use binary
buffer.addInt16(1)
} else {
buffer.addInt16(0) // format codes to use text
}
return writer.flush(code.bind)
}
type ExecOpts = {
portal?: string;
rows?: number;
}
const emptyExecute = Buffer.from([code.execute, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00])
const execute = (config?: ExecOpts): Buffer => {
// this is the happy path for most queries
if (!config || !config.portal && !config.rows) {
return emptyExecute;
}
const portal = config.portal || ''
const rows = config.rows || 0
const portalLength = Buffer.byteLength(portal)
const len = 4 + portalLength + 1 + 4
// one extra bit for code
const buff = Buffer.allocUnsafe(1 + len)
buff[0] = code.execute
buff.writeInt32BE(len, 1)
buff.write(portal, 5, 'utf-8')
buff[portalLength + 5] = 0; // null terminate portal cString
buff.writeUInt32BE(rows, buff.length - 4)
return buff;
}
const cancel = (processID: number, secretKey: number): Buffer => {
const buffer = Buffer.allocUnsafe(16)
buffer.writeInt32BE(16, 0)
buffer.writeInt16BE(1234, 4)
buffer.writeInt16BE(5678, 6)
buffer.writeInt32BE(processID, 8)
buffer.writeInt32BE(secretKey, 12)
return buffer;
}
type PortalOpts = {
type: 'S' | 'P',
name?: string;
}
const cstringMessage = (code: code, string: string): Buffer => {
const stringLen = Buffer.byteLength(string)
const len = 4 + stringLen + 1
// one extra bit for code
const buffer = Buffer.allocUnsafe(1 + len)
buffer[0] = code
buffer.writeInt32BE(len, 1)
buffer.write(string, 5, 'utf-8')
buffer[len] = 0 // null terminate cString
return buffer
}
const emptyDescribePortal = writer.addCString('P').flush(code.describe)
const emptyDescribeStatement = writer.addCString('S').flush(code.describe)
const describe = (msg: PortalOpts): Buffer => {
return msg.name ?
cstringMessage(code.describe,`${msg.type}${msg.name || ''}`) :
msg.type === 'P' ?
emptyDescribePortal :
emptyDescribeStatement;
}
const close = (msg: PortalOpts): Buffer => {
const text = `${msg.type}${msg.name || ''}`
return cstringMessage(code.close, text)
}
const copyData = (chunk: Buffer): Buffer => {
return writer.add(chunk).flush(code.copyFromChunk)
}
const copyFail = (message: string): Buffer => {
return cstringMessage(code.copyFail, message);
}
const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04])
const flushBuffer = codeOnlyBuffer(code.flush)
const syncBuffer = codeOnlyBuffer(code.sync)
const endBuffer = codeOnlyBuffer(code.end)
const copyDoneBuffer = codeOnlyBuffer(code.copyDone)
const serialize = {
startup,
password,
requestSsl,
sendSASLInitialResponseMessage,
sendSCRAMClientFinalMessage,
query,
parse,
bind,
execute,
describe,
close,
flush: () => flushBuffer,
sync: () => syncBuffer,
end: () => endBuffer,
copyData,
copyDone: () => copyDoneBuffer,
copyFail,
cancel
}
export { serialize }

View File

@ -11,11 +11,8 @@ var net = require('net')
var EventEmitter = require('events').EventEmitter
var util = require('util')
var Writer = require('buffer-writer')
// eslint-disable-next-line
const { parse } = require('pg-packet-stream')
var TEXT_MODE = 0
const { parse, serialize } = require('../../pg-protocol/dist')
// TODO(bmc) support binary mode here
// var BINARY_MODE = 1
@ -28,15 +25,9 @@ var Connection = function (config) {
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.lastOffset = 0
this.buffer = null
this.offset = null
this.encoding = config.encoding || 'utf8'
this.parsedStatements = {}
this.writer = new Writer()
this.ssl = config.ssl || false
this._ending = false
this._mode = TEXT_MODE
this._emitMessage = false
var self = this
this.on('newListener', function (eventName) {
@ -122,244 +113,103 @@ Connection.prototype.attachListeners = function (stream) {
}
Connection.prototype.requestSsl = function () {
var bodyBuffer = this.writer
.addInt16(0x04d2)
.addInt16(0x162f)
.flush()
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
this.stream.write(serialize.requestSsl())
}
Connection.prototype.startup = function (config) {
var writer = this.writer.addInt16(3).addInt16(0)
Object.keys(config).forEach(function (key) {
var val = config[key]
writer.addCString(key).addCString(val)
})
writer.addCString('client_encoding').addCString("'utf-8'")
var bodyBuffer = writer.addCString('').flush()
// this message is sent without a code
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
this.stream.write(serialize.startup(config))
}
Connection.prototype.cancel = function (processID, secretKey) {
var bodyBuffer = this.writer
.addInt16(1234)
.addInt16(5678)
.addInt32(processID)
.addInt32(secretKey)
.flush()
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
this._send(serialize.cancel(processID, secretKey))
}
Connection.prototype.password = function (password) {
// 0x70 = 'p'
this._send(0x70, this.writer.addCString(password))
this._send(serialize.password(password))
}
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
// 0x70 = 'p'
this.writer
.addCString(mechanism)
.addInt32(Buffer.byteLength(initialResponse))
.addString(initialResponse)
this._send(0x70)
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
}
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
// 0x70 = 'p'
this.writer.addString(additionalData)
this._send(0x70)
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
}
Connection.prototype._send = function (code, more) {
Connection.prototype._send = function (buffer) {
if (!this.stream.writable) {
return false
}
return this.stream.write(this.writer.flush(code))
return this.stream.write(buffer)
}
Connection.prototype.query = function (text) {
// 0x51 = Q
this.stream.write(this.writer.addCString(text).flush(0x51))
this._send(serialize.query(text))
}
// send parse message
Connection.prototype.parse = function (query) {
// expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
// types: ['int8', 'bool'] }
// normalize missing query names to allow for null
query.name = query.name || ''
if (query.name.length > 63) {
/* eslint-disable no-console */
console.error('Warning! Postgres only supports 63 characters for query names.')
console.error('You supplied %s (%s)', query.name, query.name.length)
console.error('This can cause conflicts and silent errors executing queries')
/* eslint-enable no-console */
}
// normalize null type array
query.types = query.types || []
var len = query.types.length
var buffer = this.writer
.addCString(query.name) // name of query
.addCString(query.text) // actual query text
.addInt16(len)
for (var i = 0; i < len; i++) {
buffer.addInt32(query.types[i])
}
var code = 0x50
this._send(code)
this.flush()
this._send(serialize.parse(query))
}
// send bind message
// "more" === true to buffer the message until flush() is called
Connection.prototype.bind = function (config) {
// normalize config
config = config || {}
config.portal = config.portal || ''
config.statement = config.statement || ''
config.binary = config.binary || false
var values = config.values || []
var len = values.length
var useBinary = false
for (var j = 0; j < len; j++) {
useBinary |= values[j] instanceof Buffer
}
var buffer = this.writer.addCString(config.portal).addCString(config.statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
var val = values[i]
if (val === null || typeof val === 'undefined') {
buffer.addInt32(-1)
} else if (val instanceof Buffer) {
buffer.addInt32(val.length)
buffer.add(val)
} else {
buffer.addInt32(Buffer.byteLength(val))
buffer.addString(val)
}
}
if (config.binary) {
buffer.addInt16(1) // format codes to use binary
buffer.addInt16(1)
} else {
buffer.addInt16(0) // format codes to use text
}
// 0x42 = 'B'
this._send(0x42)
this.flush()
this._send(serialize.bind(config))
}
// send execute message
// "more" === true to buffer the message until flush() is called
Connection.prototype.execute = function (config) {
config = config || {}
config.portal = config.portal || ''
config.rows = config.rows || ''
this.writer.addCString(config.portal).addInt32(config.rows)
// 0x45 = 'E'
this._send(0x45)
this.flush()
this._send(serialize.execute(config))
}
var emptyBuffer = Buffer.alloc(0)
const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04])
const flushBuffer = serialize.flush()
Connection.prototype.flush = function () {
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04])
const syncBuffer = serialize.sync()
Connection.prototype.sync = function () {
this._ending = true
// clear out any pending data in the writer
this.writer.clear()
if (this.stream.writable) {
this.stream.write(syncBuffer)
this.stream.write(flushBuffer)
}
this._send(syncBuffer)
this._send(flushBuffer)
}
const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04])
const endBuffer = serialize.end()
Connection.prototype.end = function () {
// 0x58 = 'X'
this.writer.clear()
this._ending = true
if (!this.stream.writable) {
this.stream.end()
return
}
return this.stream.write(END_BUFFER, () => {
return this.stream.write(endBuffer, () => {
this.stream.end()
})
}
Connection.prototype.close = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x43)
this._send(serialize.close(msg))
}
Connection.prototype.describe = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x44)
this.flush()
this._send(serialize.describe(msg))
}
Connection.prototype.sendCopyFromChunk = function (chunk) {
this.stream.write(this.writer.add(chunk).flush(0x64))
this._send(serialize.copyData(chunk))
}
Connection.prototype.endCopyFrom = function () {
this.stream.write(this.writer.add(emptyBuffer).flush(0x63))
this._send(serialize.copyDone())
}
Connection.prototype.sendCopyFail = function (msg) {
// this.stream.write(this.writer.add(emptyBuffer).flush(0x66));
this.writer.addCString(msg)
this._send(0x66)
this._send(serialize.copyFail(msg))
}
module.exports = Connection

View File

@ -22,7 +22,7 @@
"buffer-writer": "2.0.0",
"packet-reader": "1.0.0",
"pg-connection-string": "0.1.3",
"pg-packet-stream": "^1.1.0",
"pg-protocol": "^1.1.0",
"pg-pool": "^3.0.0",
"pg-types": "^2.1.0",
"pgpass": "1.x",