Merge pull request #2044 from brianc/bmc/packet-stream-parser

Add packet stream parser
This commit is contained in:
Brian C 2019-12-26 21:36:26 -06:00 committed by GitHub
commit 3278dced6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1990 additions and 65 deletions

View File

@ -32,7 +32,7 @@ RUN apt-get update \
&& curl -sS https://dl.yarnpkg.com/$(lsb_release -is | tr '[:upper:]' '[:lower:]')/pubkey.gpg | apt-key add - 2>/dev/null \
&& echo "deb https://dl.yarnpkg.com/$(lsb_release -is | tr '[:upper:]' '[:lower:]')/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update \
&& apt-get -y install --no-install-recommends yarn tmux locales \
&& apt-get -y install --no-install-recommends yarn tmux locales postgresql \
#
# Install eslint globally
&& npm install -g eslint \

View File

@ -1,8 +1,18 @@
{
"plugins": ["node"],
"extends": ["standard", "eslint:recommended", "plugin:node/recommended"],
"plugins": [
"node"
],
"extends": [
"standard",
"eslint:recommended",
"plugin:node/recommended"
],
"ignorePatterns": [
"**/*.ts"
],
"parserOptions": {
"ecmaVersion": 2017
"ecmaVersion": 2017,
"sourceType": "module"
},
"env": {
"node": true,
@ -11,10 +21,13 @@
},
"rules": {
"space-before-function-paren": "off",
"node/no-unsupported-features/es-syntax": "off",
"node/no-unpublished-require": [
"error",
{
"allowModules": ["pg"]
"allowModules": [
"pg"
]
}
]
}

2
.gitignore vendored
View File

@ -5,3 +5,5 @@ build/
node_modules/
package-lock.json
*.swp
dist
.DS_Store

View File

@ -0,0 +1,21 @@
{
"name": "pg-packet-stream",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"devDependencies": {
"@types/node": "^12.12.21",
"chunky": "^0.0.0",
"typescript": "^3.7.3",
"@types/chai": "^4.2.7",
"@types/mocha": "^5.2.7",
"chai": "^4.2.0",
"mocha": "^6.2.2",
"ts-node": "^8.5.4"
},
"scripts": {
"test": "mocha -r ts-node/register src/**/*.test.ts"
},
"dependencies": {}
}

View File

@ -0,0 +1,44 @@
const emptyBuffer = Buffer.allocUnsafe(0);
export class BufferReader {
private buffer: Buffer = emptyBuffer;
// TODO(bmc): support non-utf8 encoding
private encoding: string = 'utf-8';
constructor(private offset: number = 0) {
}
public setBuffer(offset: number, buffer: Buffer): void {
this.offset = offset;
this.buffer = buffer;
}
public int16() {
const result = this.buffer.readInt16BE(this.offset);
this.offset += 2;
return result;
}
public byte() {
const result = this.buffer[this.offset];
this.offset++;
return result;
}
public int32() {
const result = this.buffer.readInt32BE(this.offset);
this.offset += 4;
return result;
}
public string(length: number): string {
const result = this.buffer.toString(this.encoding, this.offset, this.offset + length);
this.offset += length;
return result;
}
public cstring(): string {
var start = this.offset;
var end = this.buffer.indexOf(0, start);
this.offset = end + 1;
return this.buffer.toString(this.encoding, start, end);
}
public bytes(length: number): Buffer {
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}
}

View File

@ -0,0 +1,510 @@
import buffers from './testing/test-buffers'
import BufferList from './testing/buffer-list'
import { PgPacketStream } from './'
import assert from 'assert'
import { Readable } from 'stream'
var authOkBuffer = buffers.authenticationOk()
var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8')
var readyForQueryBuffer = buffers.readyForQuery()
var backendKeyDataBuffer = buffers.backendKeyData(1, 2)
var commandCompleteBuffer = buffers.commandComplete('SELECT 3')
var parseCompleteBuffer = buffers.parseComplete()
var bindCompleteBuffer = buffers.bindComplete()
var portalSuspendedBuffer = buffers.portalSuspended()
var addRow = function (bufferList: BufferList, name: string, offset: number) {
return bufferList.addCString(name) // field name
.addInt32(offset++) // table id
.addInt16(offset++) // attribute of column number
.addInt32(offset++) // objectId of field's data type
.addInt16(offset++) // datatype size
.addInt32(offset++) // type modifier
.addInt16(0) // format code, 0 => text
}
var row1 = {
name: 'id',
tableID: 1,
attributeNumber: 2,
dataTypeID: 3,
dataTypeSize: 4,
typeModifier: 5,
formatCode: 0
}
var oneRowDescBuff = buffers.rowDescription([row1])
row1.name = 'bang'
var twoRowBuf = buffers.rowDescription([row1, {
name: 'whoah',
tableID: 10,
attributeNumber: 11,
dataTypeID: 12,
dataTypeSize: 13,
typeModifier: 14,
formatCode: 0
}])
var emptyRowFieldBuf = new BufferList()
.addInt16(0)
.join(true, 'D')
var emptyRowFieldBuf = buffers.dataRow([])
var oneFieldBuf = new BufferList()
.addInt16(1) // number of fields
.addInt32(5) // length of bytes of fields
.addCString('test')
.join(true, 'D')
var oneFieldBuf = buffers.dataRow(['test'])
var expectedAuthenticationOkayMessage = {
name: 'authenticationOk',
length: 8
}
var expectedParameterStatusMessage = {
name: 'parameterStatus',
parameterName: 'client_encoding',
parameterValue: 'UTF8',
length: 25
}
var expectedBackendKeyDataMessage = {
name: 'backendKeyData',
processID: 1,
secretKey: 2
}
var expectedReadyForQueryMessage = {
name: 'readyForQuery',
length: 5,
status: 'I'
}
var expectedCommandCompleteMessage = {
name: 'commandComplete',
length: 13,
text: 'SELECT 3'
}
var emptyRowDescriptionBuffer = new BufferList()
.addInt16(0) // number of fields
.join(true, 'T')
var expectedEmptyRowDescriptionMessage = {
name: 'rowDescription',
length: 6,
fieldCount: 0,
fields: [],
}
var expectedOneRowMessage = {
name: 'rowDescription',
length: 27,
fieldCount: 1,
fields: [{
name: 'id',
tableID: 1,
columnID: 2,
dataTypeID: 3,
dataTypeSize: 4,
dataTypeModifier: 5,
format: 'text'
}]
}
var expectedTwoRowMessage = {
name: 'rowDescription',
length: 53,
fieldCount: 2,
fields: [{
name: 'bang',
tableID: 1,
columnID: 2,
dataTypeID: 3,
dataTypeSize: 4,
dataTypeModifier: 5,
format: 'text'
},
{
name: 'whoah',
tableID: 10,
columnID: 11,
dataTypeID: 12,
dataTypeSize: 13,
dataTypeModifier: 14,
format: 'text'
}]
}
const concat = (stream: Readable): Promise<any[]> => {
return new Promise((resolve) => {
const results: any[] = []
stream.on('data', item => results.push(item))
stream.on('end', () => resolve(results))
})
}
var testForMessage = function (buffer: Buffer, expectedMessage: any) {
it('recieves and parses ' + expectedMessage.name, async () => {
const parser = new PgPacketStream();
parser.write(buffer);
parser.end();
const [lastMessage] = await concat(parser);
for (const key in expectedMessage) {
assert.deepEqual(lastMessage[key], expectedMessage[key])
}
})
}
var plainPasswordBuffer = buffers.authenticationCleartextPassword()
var md5PasswordBuffer = buffers.authenticationMD5Password()
var SASLBuffer = buffers.authenticationSASL()
var SASLContinueBuffer = buffers.authenticationSASLContinue()
var SASLFinalBuffer = buffers.authenticationSASLFinal()
var expectedPlainPasswordMessage = {
name: 'authenticationCleartextPassword'
}
var expectedMD5PasswordMessage = {
name: 'authenticationMD5Password',
salt: Buffer.from([1, 2, 3, 4])
}
var expectedSASLMessage = {
name: 'authenticationSASL',
mechanisms: ['SCRAM-SHA-256']
}
var expectedSASLContinueMessage = {
name: 'authenticationSASLContinue',
data: 'data',
}
var expectedSASLFinalMessage = {
name: 'authenticationSASLFinal',
data: 'data',
}
var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom')
var expectedNotificationResponseMessage = {
name: 'notification',
processId: 4,
channel: 'hi',
payload: 'boom'
}
describe('PgPacketStream', function () {
testForMessage(authOkBuffer, expectedAuthenticationOkayMessage)
testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage)
testForMessage(md5PasswordBuffer, expectedMD5PasswordMessage)
testForMessage(SASLBuffer, expectedSASLMessage)
testForMessage(SASLContinueBuffer, expectedSASLContinueMessage)
testForMessage(SASLFinalBuffer, expectedSASLFinalMessage)
testForMessage(paramStatusBuffer, expectedParameterStatusMessage)
testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage)
testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage)
testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage)
testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage)
testForMessage(buffers.emptyQuery(), {
name: 'emptyQuery',
length: 4,
})
testForMessage(Buffer.from([0x6e, 0, 0, 0, 4]), {
name: 'noData'
})
describe('rowDescription messages', function () {
testForMessage(emptyRowDescriptionBuffer, expectedEmptyRowDescriptionMessage)
testForMessage(oneRowDescBuff, expectedOneRowMessage)
testForMessage(twoRowBuf, expectedTwoRowMessage)
})
describe('parsing rows', function () {
describe('parsing empty row', function () {
testForMessage(emptyRowFieldBuf, {
name: 'dataRow',
fieldCount: 0
})
})
describe('parsing data row with fields', function () {
testForMessage(oneFieldBuf, {
name: 'dataRow',
fieldCount: 1,
fields: ['test']
})
})
})
describe('notice message', function () {
// this uses the same logic as error message
var buff = buffers.notice([{ type: 'C', value: 'code' }])
testForMessage(buff, {
name: 'notice',
code: 'code'
})
})
testForMessage(buffers.error([]), {
name: 'error'
})
describe('with all the fields', function () {
var buffer = buffers.error([{
type: 'S',
value: 'ERROR'
}, {
type: 'C',
value: 'code'
}, {
type: 'M',
value: 'message'
}, {
type: 'D',
value: 'details'
}, {
type: 'H',
value: 'hint'
}, {
type: 'P',
value: '100'
}, {
type: 'p',
value: '101'
}, {
type: 'q',
value: 'query'
}, {
type: 'W',
value: 'where'
}, {
type: 'F',
value: 'file'
}, {
type: 'L',
value: 'line'
}, {
type: 'R',
value: 'routine'
}, {
type: 'Z', // ignored
value: 'alsdkf'
}])
testForMessage(buffer, {
name: 'error',
severity: 'ERROR',
code: 'code',
message: 'message',
detail: 'details',
hint: 'hint',
position: '100',
internalPosition: '101',
internalQuery: 'query',
where: 'where',
file: 'file',
line: 'line',
routine: 'routine'
})
})
testForMessage(parseCompleteBuffer, {
name: 'parseComplete'
})
testForMessage(bindCompleteBuffer, {
name: 'bindComplete'
})
testForMessage(bindCompleteBuffer, {
name: 'bindComplete'
})
testForMessage(buffers.closeComplete(), {
name: 'closeComplete'
})
describe('parses portal suspended message', function () {
testForMessage(portalSuspendedBuffer, {
name: 'portalSuspended'
})
})
describe('parses replication start message', function () {
testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), {
name: 'replicationStart',
length: 4
})
})
describe('copy', () => {
testForMessage(buffers.copyIn(0), {
name: 'copyInResponse',
length: 7,
binary: false,
columnTypes: []
})
testForMessage(buffers.copyIn(2), {
name: 'copyInResponse',
length: 11,
binary: false,
columnTypes: [0, 1]
})
testForMessage(buffers.copyOut(0), {
name: 'copyOutResponse',
length: 7,
binary: false,
columnTypes: []
})
testForMessage(buffers.copyOut(3), {
name: 'copyOutResponse',
length: 13,
binary: false,
columnTypes: [0, 1, 2]
})
testForMessage(buffers.copyDone(), {
name: 'copyDone',
length: 4,
})
testForMessage(buffers.copyData(Buffer.from([5, 6, 7])), {
name: 'copyData',
length: 7,
chunk: Buffer.from([5, 6, 7])
})
})
// since the data message on a stream can randomly divide the incomming
// tcp packets anywhere, we need to make sure we can parse every single
// split on a tcp message
describe('split buffer, single message parsing', function () {
var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!'])
const parse = async (buffers: Buffer[]): Promise<any> => {
const parser = new PgPacketStream();
for (const buffer of buffers) {
parser.write(buffer);
}
parser.end()
const [msg] = await concat(parser)
return msg;
}
it('parses when full buffer comes in', async function () {
const message = await parse([fullBuffer]);
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
assert.equal(message.fields[2], 'zug zug')
assert.equal(message.fields[3], null)
assert.equal(message.fields[4], '!')
})
var testMessageRecievedAfterSpiltAt = async function (split: number) {
var firstBuffer = Buffer.alloc(fullBuffer.length - split)
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const message = await parse([firstBuffer, secondBuffer]);
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
assert.equal(message.fields[2], 'zug zug')
assert.equal(message.fields[3], null)
assert.equal(message.fields[4], '!')
}
it('parses when split in the middle', function () {
testMessageRecievedAfterSpiltAt(6)
})
it('parses when split at end', function () {
testMessageRecievedAfterSpiltAt(2)
})
it('parses when split at beginning', function () {
testMessageRecievedAfterSpiltAt(fullBuffer.length - 2)
testMessageRecievedAfterSpiltAt(fullBuffer.length - 1)
testMessageRecievedAfterSpiltAt(fullBuffer.length - 5)
})
})
describe('split buffer, multiple message parsing', function () {
var dataRowBuffer = buffers.dataRow(['!'])
var readyForQueryBuffer = buffers.readyForQuery()
var fullBuffer = Buffer.alloc(dataRowBuffer.length + readyForQueryBuffer.length)
dataRowBuffer.copy(fullBuffer, 0, 0)
readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0)
const parse = (buffers: Buffer[]): Promise<any[]> => {
const parser = new PgPacketStream();
for (const buffer of buffers) {
parser.write(buffer);
}
parser.end()
return concat(parser)
}
var verifyMessages = function (messages: any[]) {
assert.strictEqual(messages.length, 2)
assert.deepEqual(messages[0], {
name: 'dataRow',
fieldCount: 1,
length: 11,
fields: ['!']
})
assert.equal(messages[0].fields[0], '!')
assert.deepEqual(messages[1], {
name: 'readyForQuery',
length: 5,
status: 'I'
})
}
// sanity check
it('recieves both messages when packet is not split', async function () {
const messages = await parse([fullBuffer])
verifyMessages(messages)
})
var splitAndVerifyTwoMessages = async function (split: number) {
var firstBuffer = Buffer.alloc(fullBuffer.length - split)
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const messages = await parse([firstBuffer, secondBuffer])
verifyMessages(messages)
}
describe('recieves both messages when packet is split', function () {
it('in the middle', function () {
return splitAndVerifyTwoMessages(11)
})
it('at the front', function () {
return Promise.all([
splitAndVerifyTwoMessages(fullBuffer.length - 1),
splitAndVerifyTwoMessages(fullBuffer.length - 4),
splitAndVerifyTwoMessages(fullBuffer.length - 6)
])
})
it('at the end', function () {
return Promise.all([
splitAndVerifyTwoMessages(8),
splitAndVerifyTwoMessages(1)
])
})
})
})
})

View File

@ -0,0 +1,328 @@
import { Transform, TransformCallback, TransformOptions } from 'stream';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password } from './messages';
import { BufferReader } from './BufferReader';
import assert from 'assert'
// every message is prefixed with a single bye
const CODE_LENGTH = 1;
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4;
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH;
export type Packet = {
code: number;
packet: Buffer;
}
const emptyBuffer = Buffer.allocUnsafe(0);
type StreamOptions = TransformOptions & {
mode: Mode
}
const enum MessageCodes {
DataRow = 0x44, // D
ParseComplete = 0x31, // 1
BindComplete = 0x32, // 2
CloseComplete = 0x33, // 3
CommandComplete = 0x43, // C
ReadyForQuery = 0x5a, // Z
NoData = 0x6e, // n
NotificationResponse = 0x41, // A
AuthenticationResponse = 0x52, // R
ParameterStatus = 0x53, // S
BackendKeyData = 0x4b, // K
ErrorMessage = 0x45, // E
NoticeMessage = 0x4e, // N
RowDescriptionMessage = 0x54, // T
PortalSuspended = 0x73, // s
ReplicationStart = 0x57, // W
EmptyQuery = 0x49, // I
CopyIn = 0x47, // G
CopyOut = 0x48, // H
CopyDone = 0x63, // c
CopyData = 0x64, // d
}
export class PgPacketStream extends Transform {
private remainingBuffer: Buffer = emptyBuffer;
private reader = new BufferReader();
private mode: Mode;
constructor(opts?: StreamOptions) {
super({
...opts,
readableObjectMode: true
})
if (opts?.mode === 'binary') {
throw new Error('Binary mode not supported yet')
}
this.mode = opts?.mode || 'text';
}
public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) {
let combinedBuffer = buffer;
if (this.remainingBuffer.byteLength) {
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength);
this.remainingBuffer.copy(combinedBuffer)
buffer.copy(combinedBuffer, this.remainingBuffer.byteLength)
}
let offset = 0;
while ((offset + HEADER_LENGTH) <= combinedBuffer.byteLength) {
// code is 1 byte long - it identifies the message type
const code = combinedBuffer[offset];
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH);
const fullMessageLength = CODE_LENGTH + length;
if (fullMessageLength + offset <= combinedBuffer.byteLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer);
this.push(message)
offset += fullMessageLength;
} else {
break;
}
}
if (offset === combinedBuffer.byteLength) {
this.remainingBuffer = emptyBuffer;
} else {
this.remainingBuffer = combinedBuffer.slice(offset)
}
callback(null);
}
private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
switch (code) {
case MessageCodes.BindComplete:
return bindComplete;
case MessageCodes.ParseComplete:
return parseComplete;
case MessageCodes.CloseComplete:
return closeComplete;
case MessageCodes.NoData:
return noData;
case MessageCodes.PortalSuspended:
return portalSuspended;
case MessageCodes.CopyDone:
return copyDone;
case MessageCodes.ReplicationStart:
return replicationStart;
case MessageCodes.EmptyQuery:
return emptyQuery;
case MessageCodes.DataRow:
return this.parseDataRowMessage(offset, length, bytes);
case MessageCodes.CommandComplete:
return this.parseCommandCompleteMessage(offset, length, bytes);
case MessageCodes.ReadyForQuery:
return this.parseReadyForQueryMessage(offset, length, bytes);
case MessageCodes.NotificationResponse:
return this.parseNotificationMessage(offset, length, bytes);
case MessageCodes.AuthenticationResponse:
return this.parseAuthenticationResponse(offset, length, bytes);
case MessageCodes.ParameterStatus:
return this.parseParameterStatusMessage(offset, length, bytes);
case MessageCodes.BackendKeyData:
return this.parseBackendKeyData(offset, length, bytes);
case MessageCodes.ErrorMessage:
return this.parseErrorMessage(offset, length, bytes, MessageName.error);
case MessageCodes.NoticeMessage:
return this.parseErrorMessage(offset, length, bytes, MessageName.notice);
case MessageCodes.RowDescriptionMessage:
return this.parseRowDescriptionMessage(offset, length, bytes);
case MessageCodes.CopyIn:
return this.parseCopyInMessage(offset, length, bytes);
case MessageCodes.CopyOut:
return this.parseCopyOutMessage(offset, length, bytes);
case MessageCodes.CopyData:
return this.parseCopyData(offset, length, bytes);
default:
assert.fail(`unknown message code: ${code.toString(16)}`)
}
}
public _flush(callback: TransformCallback) {
this._transform(Buffer.alloc(0), 'utf-8', callback)
}
private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const status = this.reader.string(1);
return new ReadyForQueryMessage(length, status)
}
private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const text = this.reader.cstring();
return new CommandCompleteMessage(length, text);
}
private parseCopyData(offset: number, length: number, bytes: Buffer) {
const chunk = bytes.slice(offset, offset + (length - 4));
return new CopyDataMessage(length, chunk);
}
private parseCopyInMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, MessageName.copyInResponse)
}
private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, MessageName.copyOutResponse)
}
private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) {
this.reader.setBuffer(offset, bytes);
const isBinary = this.reader.byte() !== 0;
const columnCount = this.reader.int16()
const message = new CopyResponse(length, messageName, isBinary, columnCount);
for (let i = 0; i < columnCount; i++) {
message.columnTypes[i] = this.reader.int16();
}
return message;
}
private parseNotificationMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const processId = this.reader.int32();
const channel = this.reader.cstring();
const payload = this.reader.cstring();
return new NotificationResponseMessage(length, processId, channel, payload);
}
private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const fieldCount = this.reader.int16()
const message = new RowDescriptionMessage(length, fieldCount);
for (let i = 0; i < fieldCount; i++) {
message.fields[i] = this.parseField()
}
return message;
}
private parseField(): Field {
const name = this.reader.cstring()
const tableID = this.reader.int32()
const columnID = this.reader.int16()
const dataTypeID = this.reader.int32()
const dataTypeSize = this.reader.int16()
const dataTypeModifier = this.reader.int32()
const mode = this.reader.int16() === 0 ? 'text' : 'binary';
return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
}
private parseDataRowMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const fieldCount = this.reader.int16();
const fields: any[] = new Array(fieldCount);
for (let i = 0; i < fieldCount; i++) {
const len = this.reader.int32();
if (len === -1) {
fields[i] = null
} else if (this.mode === 'text') {
fields[i] = this.reader.string(len)
}
}
return new DataRowMessage(length, fields);
}
private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const name = this.reader.cstring();
const value = this.reader.cstring()
return new ParameterStatusMessage(length, name, value)
}
private parseBackendKeyData(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const processID = this.reader.int32()
const secretKey = this.reader.int32()
return new BackendKeyDataMessage(length, processID, secretKey)
}
public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const code = this.reader.int32()
// TODO(bmc): maybe better types here
const message: BackendMessage & any = {
name: MessageName.authenticationOk,
length,
};
switch (code) {
case 0: // AuthenticationOk
break;
case 3: // AuthenticationCleartextPassword
if (message.length === 8) {
message.name = MessageName.authenticationCleartextPassword
}
break
case 5: // AuthenticationMD5Password
if (message.length === 12) {
message.name = MessageName.authenticationMD5Password
const salt = this.reader.bytes(4);
return new AuthenticationMD5Password(length, salt);
}
break
case 10: // AuthenticationSASL
message.name = MessageName.authenticationSASL
message.mechanisms = []
let mechanism: string;
do {
mechanism = this.reader.cstring()
if (mechanism) {
message.mechanisms.push(mechanism)
}
} while (mechanism)
break;
case 11: // AuthenticationSASLContinue
message.name = MessageName.authenticationSASLContinue
message.data = this.reader.string(length - 4)
break;
case 12: // AuthenticationSASLFinal
message.name = MessageName.authenticationSASLFinal
message.data = this.reader.string(length - 4)
break;
default:
throw new Error('Unknown authenticationOk message type ' + code)
}
return message;
}
private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) {
this.reader.setBuffer(offset, bytes);
var fields: Record<string, string> = {}
var fieldType = this.reader.string(1)
while (fieldType !== '\0') {
fields[fieldType] = this.reader.cstring()
fieldType = this.reader.string(1)
}
// the msg is an Error instance
var message = new DatabaseError(fields.M, length, name)
message.severity = fields.S
message.code = fields.C
message.detail = fields.D
message.hint = fields.H
message.position = fields.P
message.internalPosition = fields.p
message.internalQuery = fields.q
message.where = fields.W
message.schema = fields.s
message.table = fields.t
message.column = fields.c
message.dataType = fields.d
message.constraint = fields.n
message.file = fields.F
message.line = fields.L
message.routine = fields.R
return message;
}
}

View File

@ -0,0 +1,169 @@
export type Mode = 'text' | 'binary';
export const enum MessageName {
parseComplete = 'parseComplete',
bindComplete = 'bindComplete',
closeComplete = 'closeComplete',
noData = 'noData',
portalSuspended = 'portalSuspended',
replicationStart = 'replicationStart',
emptyQuery = 'emptyQuery',
copyDone = 'copyDone',
copyData = 'copyData',
rowDescription = 'rowDescription',
parameterStatus = 'parameterStatus',
backendKeyData = 'backendKeyData',
notification = 'notification',
readyForQuery = 'readyForQuery',
commandComplete = 'commandComplete',
dataRow = 'dataRow',
copyInResponse = 'copyInResponse',
copyOutResponse = 'copyOutResponse',
authenticationOk = 'authenticationOk',
authenticationMD5Password = 'authenticationMD5Password',
authenticationCleartextPassword = 'authenticationCleartextPassword',
authenticationSASL = 'authenticationSASL',
authenticationSASLContinue = 'authenticationSASLContinue',
authenticationSASLFinal = 'authenticationSASLFinal',
error = 'error',
notice = 'notice',
}
export interface BackendMessage {
name: MessageName;
length: number;
}
export const parseComplete: BackendMessage = {
name: MessageName.parseComplete,
length: 5,
};
export const bindComplete: BackendMessage = {
name: MessageName.bindComplete,
length: 5,
}
export const closeComplete: BackendMessage = {
name: MessageName.closeComplete,
length: 5,
}
export const noData: BackendMessage = {
name: MessageName.noData,
length: 5
}
export const portalSuspended: BackendMessage = {
name: MessageName.portalSuspended,
length: 5,
}
export const replicationStart: BackendMessage = {
name: MessageName.replicationStart,
length: 4,
}
export const emptyQuery: BackendMessage = {
name: MessageName.emptyQuery,
length: 4,
}
export const copyDone: BackendMessage = {
name: MessageName.copyDone,
length: 4,
}
export class DatabaseError extends Error {
public severity: string | undefined;
public code: string | undefined;
public detail: string | undefined;
public hint: string | undefined;
public position: string | undefined;
public internalPosition: string | undefined;
public internalQuery: string | undefined;
public where: string | undefined;
public schema: string | undefined;
public table: string | undefined;
public column: string | undefined;
public dataType: string | undefined;
public constraint: string | undefined;
public file: string | undefined;
public line: string | undefined;
public routine: string | undefined;
constructor(message: string, public readonly length: number, public readonly name: MessageName) {
super(message)
}
}
export class CopyDataMessage {
public readonly name = MessageName.copyData;
constructor(public readonly length: number, public readonly chunk: Buffer) {
}
}
export class CopyResponse {
public readonly columnTypes: number[];
constructor(public readonly length: number, public readonly name: MessageName, public readonly binary: boolean, columnCount: number) {
this.columnTypes = new Array(columnCount);
}
}
export class Field {
constructor(public readonly name: string, public readonly tableID: number, public readonly columnID: number, public readonly dataTypeID: number, public readonly dataTypeSize: number, public readonly dataTypeModifier: number, public readonly format: Mode) {
}
}
export class RowDescriptionMessage {
public readonly name: MessageName = MessageName.rowDescription;
public readonly fields: Field[];
constructor(public readonly length: number, public readonly fieldCount: number) {
this.fields = new Array(this.fieldCount)
}
}
export class ParameterStatusMessage {
public readonly name: MessageName = MessageName.parameterStatus;
constructor(public readonly length: number, public readonly parameterName: string, public readonly parameterValue: string) {
}
}
export class AuthenticationMD5Password implements BackendMessage {
public readonly name: MessageName = MessageName.authenticationMD5Password;
constructor(public readonly length: number, public readonly salt: Buffer) {
}
}
export class BackendKeyDataMessage {
public readonly name: MessageName = MessageName.backendKeyData;
constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) {
}
}
export class NotificationResponseMessage {
public readonly name: MessageName = MessageName.notification;
constructor(public readonly length: number, public readonly processId: number, public readonly channel: string, public readonly payload: string) {
}
}
export class ReadyForQueryMessage {
public readonly name: MessageName = MessageName.readyForQuery;
constructor(public readonly length: number, public readonly status: string) {
}
}
export class CommandCompleteMessage {
public readonly name: MessageName = MessageName.commandComplete
constructor(public readonly length: number, public readonly text: string) {
}
}
export class DataRowMessage {
public readonly fieldCount: number;
public readonly name: MessageName = MessageName.dataRow
constructor(public length: number, public fields: any[]) {
this.fieldCount = fields.length;
}
}

View File

@ -0,0 +1,79 @@
export default class BufferList {
constructor(public buffers: Buffer[] = []) {
}
public add(buffer: Buffer, front?: boolean) {
this.buffers[front ? 'unshift' : 'push'](buffer)
return this
}
public addInt16(val: number, front?: boolean) {
return this.add(Buffer.from([(val >>> 8), (val >>> 0)]), front)
}
public getByteLength(initial?: number) {
return this.buffers.reduce(function (previous, current) {
return previous + current.length
}, initial || 0)
}
public addInt32(val: number, first?: boolean) {
return this.add(Buffer.from([
(val >>> 24 & 0xFF),
(val >>> 16 & 0xFF),
(val >>> 8 & 0xFF),
(val >>> 0 & 0xFF)
]), first)
}
public addCString(val: string, front?: boolean) {
var len = Buffer.byteLength(val)
var buffer = Buffer.alloc(len + 1)
buffer.write(val)
buffer[len] = 0
return this.add(buffer, front)
}
public addString(val: string, front?: boolean) {
var len = Buffer.byteLength(val)
var buffer = Buffer.alloc(len)
buffer.write(val)
return this.add(buffer, front)
}
public addChar(char: string, first?: boolean) {
return this.add(Buffer.from(char, 'utf8'), first)
}
public addByte(byte: number) {
return this.add(Buffer.from([byte]))
}
public join(appendLength?: boolean, char?: string): Buffer {
var length = this.getByteLength()
if (appendLength) {
this.addInt32(length + 4, true)
return this.join(false, char)
}
if (char) {
this.addChar(char, true)
length++
}
var result = Buffer.alloc(length)
var index = 0
this.buffers.forEach(function (buffer) {
buffer.copy(result, index, 0)
index += buffer.length
})
return result
}
public static concat(): Buffer {
var total = new BufferList()
for (var i = 0; i < arguments.length; i++) {
total.add(arguments[i])
}
return total.join()
}
}

View File

@ -0,0 +1,183 @@
// http://developer.postgresql.org/pgdocs/postgres/protocol-message-formats.html
import BufferList from './buffer-list'
const buffers = {
readyForQuery: function () {
return new BufferList()
.add(Buffer.from('I'))
.join(true, 'Z')
},
authenticationOk: function () {
return new BufferList()
.addInt32(0)
.join(true, 'R')
},
authenticationCleartextPassword: function () {
return new BufferList()
.addInt32(3)
.join(true, 'R')
},
authenticationMD5Password: function () {
return new BufferList()
.addInt32(5)
.add(Buffer.from([1, 2, 3, 4]))
.join(true, 'R')
},
authenticationSASL: function () {
return new BufferList()
.addInt32(10)
.addCString('SCRAM-SHA-256')
.addCString('')
.join(true, 'R')
},
authenticationSASLContinue: function () {
return new BufferList()
.addInt32(11)
.addString('data')
.join(true, 'R')
},
authenticationSASLFinal: function () {
return new BufferList()
.addInt32(12)
.addString('data')
.join(true, 'R')
},
parameterStatus: function (name: string, value: string) {
return new BufferList()
.addCString(name)
.addCString(value)
.join(true, 'S')
},
backendKeyData: function (processID: number, secretKey: number) {
return new BufferList()
.addInt32(processID)
.addInt32(secretKey)
.join(true, 'K')
},
commandComplete: function (string: string) {
return new BufferList()
.addCString(string)
.join(true, 'C')
},
rowDescription: function (fields: any[]) {
fields = fields || []
var buf = new BufferList()
buf.addInt16(fields.length)
fields.forEach(function (field) {
buf.addCString(field.name)
.addInt32(field.tableID || 0)
.addInt16(field.attributeNumber || 0)
.addInt32(field.dataTypeID || 0)
.addInt16(field.dataTypeSize || 0)
.addInt32(field.typeModifier || 0)
.addInt16(field.formatCode || 0)
})
return buf.join(true, 'T')
},
dataRow: function (columns: any[]) {
columns = columns || []
var buf = new BufferList()
buf.addInt16(columns.length)
columns.forEach(function (col) {
if (col == null) {
buf.addInt32(-1)
} else {
var strBuf = Buffer.from(col, 'utf8')
buf.addInt32(strBuf.length)
buf.add(strBuf)
}
})
return buf.join(true, 'D')
},
error: function (fields: any) {
return buffers.errorOrNotice(fields).join(true, 'E')
},
notice: function (fields: any) {
return buffers.errorOrNotice(fields).join(true, 'N')
},
errorOrNotice: function (fields: any) {
fields = fields || []
var buf = new BufferList()
fields.forEach(function (field: any) {
buf.addChar(field.type)
buf.addCString(field.value)
})
return buf.add(Buffer.from([0]))// terminator
},
parseComplete: function () {
return new BufferList().join(true, '1')
},
bindComplete: function () {
return new BufferList().join(true, '2')
},
notification: function (id: number, channel: string, payload: string) {
return new BufferList()
.addInt32(id)
.addCString(channel)
.addCString(payload)
.join(true, 'A')
},
emptyQuery: function () {
return new BufferList().join(true, 'I')
},
portalSuspended: function () {
return new BufferList().join(true, 's')
},
closeComplete: function () {
return new BufferList().join(true, '3')
},
copyIn: function (cols: number) {
const list = new BufferList()
// text mode
.addByte(0)
// column count
.addInt16(cols);
for (let i = 0; i < cols; i++) {
list.addInt16(i);
}
return list.join(true, 'G')
},
copyOut: function (cols: number) {
const list = new BufferList()
// text mode
.addByte(0)
// column count
.addInt16(cols);
for (let i = 0; i < cols; i++) {
list.addInt16(i);
}
return list.join(true, 'H')
},
copyData: function (bytes: Buffer) {
return new BufferList().add(bytes).join(true, 'd');
},
copyDone: function () {
return new BufferList().join(true, 'c')
}
}
export default buffers

View File

@ -0,0 +1 @@
declare module 'chunky'

View File

@ -0,0 +1,24 @@
{
"compilerOptions": {
"module": "commonjs",
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
"target": "es6",
"noImplicitAny": true,
"moduleResolution": "node",
"sourceMap": true,
"outDir": "dist",
"baseUrl": ".",
"declaration": true,
"paths": {
"*": [
"node_modules/*",
"src/types/*"
]
}
},
"include": [
"src/**/*"
]
}

68
packages/pg/bench.js Normal file
View File

@ -0,0 +1,68 @@
const pg = require("./lib");
const pool = new pg.Pool()
const params = {
text:
"select typname, typnamespace, typowner, typlen, typbyval, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray from pg_type where typtypmod = $1 and typisdefined = $2",
values: [-1, true]
};
const insert = {
text: 'INSERT INTO foobar(name, age) VALUES ($1, $2)',
values: ['brian', 100]
}
const seq = {
text: 'SELECT * FROM generate_series(1, 1000)'
}
const exec = async (client, q) => {
const result = await client.query({
text: q.text,
values: q.values,
rowMode: "array"
});
};
const bench = async (client, q, time) => {
let start = Date.now();
let count = 0;
while (true) {
await exec(client, q);
count++;
if (Date.now() - start > time) {
return count;
}
}
};
const run = async () => {
const client = new pg.Client();
await client.connect();
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
await bench(client, params, 1000);
console.log("warmup done");
const seconds = 5;
let queries = await bench(client, params, seconds * 1000);
console.log('')
console.log("little queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 733 qps")
console.log('')
queries = await bench(client, seq, seconds * 1000);
console.log("sequence queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 1192 qps")
console.log('')
queries = await bench(client, insert, seconds * 1000);
console.log("insert queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 5600 qps")
await client.end();
await client.end();
};
run().catch(e => console.error(e) || process.exit(-1));

View File

@ -18,6 +18,9 @@ var ConnectionParameters = require('./connection-parameters')
var Query = require('./query')
var defaults = require('./defaults')
var Connection = require('./connection')
if (process.env.PG_FAST_CONNECTION) {
Connection = require('./connection-fast')
}
var Client = function (config) {
EventEmitter.call(this)
@ -112,7 +115,7 @@ Client.prototype._connect = function (callback) {
con.startup(self.getStartupConf())
})
function checkPgPass (cb) {
function checkPgPass(cb) {
return function (msg) {
if (typeof self.password === 'function') {
self._Promise.resolve()
@ -492,7 +495,7 @@ Client.prototype.query = function (config, values, callback) {
// we already returned an error,
// just do nothing if query completes
query.callback = () => {}
query.callback = () => { }
// Remove from queue
var index = this.queryQueue.indexOf(query)

View File

@ -0,0 +1,373 @@
'use strict'
/**
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
* All rights reserved.
*
* This source code is licensed under the MIT license found in the
* README.md file in the root directory of this source tree.
*/
var net = require('net')
var EventEmitter = require('events').EventEmitter
var util = require('util')
var Writer = require('buffer-writer')
// eslint-disable-next-line
var PacketStream = require('pg-packet-stream')
var TEXT_MODE = 0
// TODO(bmc) support binary mode here
// var BINARY_MODE = 1
console.log('using faster connection')
var Connection = function (config) {
EventEmitter.call(this)
config = config || {}
this.stream = config.stream || new net.Socket()
this.stream.setNoDelay(true)
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.lastOffset = 0
this.buffer = null
this.offset = null
this.encoding = config.encoding || 'utf8'
this.parsedStatements = {}
this.writer = new Writer()
this.ssl = config.ssl || false
this._ending = false
this._mode = TEXT_MODE
this._emitMessage = false
var self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
self._emitMessage = true
}
})
}
util.inherits(Connection, EventEmitter)
Connection.prototype.connect = function (port, host) {
var self = this
if (this.stream.readyState === 'closed') {
this.stream.connect(port, host)
} else if (this.stream.readyState === 'open') {
this.emit('connect')
}
this.stream.on('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})
const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
}
self.emit('error', error)
}
this.stream.on('error', reportStreamError)
this.stream.on('close', function () {
self.emit('end')
})
if (!this.ssl) {
return this.attachListeners(this.stream)
}
this.stream.once('data', function (buffer) {
var responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'N': // Server does not support SSL connections
return self.emit('error', new Error('The server does not support SSL connections'))
case 'S': // Server supports SSL connections, continue with a secure connection
break
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
const options = {
socket: self.stream,
checkServerIdentity: self.ssl.checkServerIdentity || tls.checkServerIdentity,
rejectUnauthorized: self.ssl.rejectUnauthorized,
ca: self.ssl.ca,
pfx: self.ssl.pfx,
key: self.ssl.key,
passphrase: self.ssl.passphrase,
cert: self.ssl.cert,
secureOptions: self.ssl.secureOptions,
NPNProtocols: self.ssl.NPNProtocols
}
if (net.isIP(host) === 0) {
options.servername = host
}
self.stream = tls.connect(options)
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)
self.emit('sslconnect')
})
}
Connection.prototype.attachListeners = function (stream) {
var self = this
const mode = this._mode === TEXT_MODE ? 'text' : 'binary'
const packetStream = new PacketStream.PgPacketStream({ mode })
this.stream.pipe(packetStream)
packetStream.on('data', (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
}
self.emit(eventName, msg)
})
stream.on('end', function () {
self.emit('end')
})
}
Connection.prototype.requestSsl = function () {
var bodyBuffer = this.writer
.addInt16(0x04d2)
.addInt16(0x162f)
.flush()
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
}
Connection.prototype.startup = function (config) {
var writer = this.writer.addInt16(3).addInt16(0)
Object.keys(config).forEach(function (key) {
var val = config[key]
writer.addCString(key).addCString(val)
})
writer.addCString('client_encoding').addCString("'utf-8'")
var bodyBuffer = writer.addCString('').flush()
// this message is sent without a code
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
}
Connection.prototype.cancel = function (processID, secretKey) {
var bodyBuffer = this.writer
.addInt16(1234)
.addInt16(5678)
.addInt32(processID)
.addInt32(secretKey)
.flush()
var length = bodyBuffer.length + 4
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join()
this.stream.write(buffer)
}
Connection.prototype.password = function (password) {
// 0x70 = 'p'
this._send(0x70, this.writer.addCString(password))
}
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
// 0x70 = 'p'
this.writer
.addCString(mechanism)
.addInt32(Buffer.byteLength(initialResponse))
.addString(initialResponse)
this._send(0x70)
}
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
// 0x70 = 'p'
this.writer.addString(additionalData)
this._send(0x70)
}
Connection.prototype._send = function (code, more) {
if (!this.stream.writable) {
return false
}
return this.stream.write(this.writer.flush(code))
}
Connection.prototype.query = function (text) {
// 0x51 = Q
this.stream.write(this.writer.addCString(text).flush(0x51))
}
// send parse message
Connection.prototype.parse = function (query) {
// expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
// types: ['int8', 'bool'] }
// normalize missing query names to allow for null
query.name = query.name || ''
if (query.name.length > 63) {
/* eslint-disable no-console */
console.error('Warning! Postgres only supports 63 characters for query names.')
console.error('You supplied %s (%s)', query.name, query.name.length)
console.error('This can cause conflicts and silent errors executing queries')
/* eslint-enable no-console */
}
// normalize null type array
query.types = query.types || []
var len = query.types.length
var buffer = this.writer
.addCString(query.name) // name of query
.addCString(query.text) // actual query text
.addInt16(len)
for (var i = 0; i < len; i++) {
buffer.addInt32(query.types[i])
}
var code = 0x50
this._send(code)
this.flush()
}
// send bind message
// "more" === true to buffer the message until flush() is called
Connection.prototype.bind = function (config) {
// normalize config
config = config || {}
config.portal = config.portal || ''
config.statement = config.statement || ''
config.binary = config.binary || false
var values = config.values || []
var len = values.length
var useBinary = false
for (var j = 0; j < len; j++) {
useBinary |= values[j] instanceof Buffer
}
var buffer = this.writer.addCString(config.portal).addCString(config.statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
var val = values[i]
if (val === null || typeof val === 'undefined') {
buffer.addInt32(-1)
} else if (val instanceof Buffer) {
buffer.addInt32(val.length)
buffer.add(val)
} else {
buffer.addInt32(Buffer.byteLength(val))
buffer.addString(val)
}
}
if (config.binary) {
buffer.addInt16(1) // format codes to use binary
buffer.addInt16(1)
} else {
buffer.addInt16(0) // format codes to use text
}
// 0x42 = 'B'
this._send(0x42)
this.flush()
}
// send execute message
// "more" === true to buffer the message until flush() is called
Connection.prototype.execute = function (config) {
config = config || {}
config.portal = config.portal || ''
config.rows = config.rows || ''
this.writer.addCString(config.portal).addInt32(config.rows)
// 0x45 = 'E'
this._send(0x45)
this.flush()
}
var emptyBuffer = Buffer.alloc(0)
const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.flush = function () {
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.sync = function () {
this._ending = true
// clear out any pending data in the writer
this.writer.clear()
if (this.stream.writable) {
this.stream.write(syncBuffer)
this.stream.write(flushBuffer)
}
}
const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.end = function () {
// 0x58 = 'X'
this.writer.clear()
this._ending = true
return this.stream.write(END_BUFFER, () => {
this.stream.end()
})
}
Connection.prototype.close = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x43)
}
Connection.prototype.describe = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x44)
this.flush()
}
Connection.prototype.sendCopyFromChunk = function (chunk) {
this.stream.write(this.writer.add(chunk).flush(0x64))
}
Connection.prototype.endCopyFrom = function () {
this.stream.write(this.writer.add(emptyBuffer).flush(0x63))
}
Connection.prototype.sendCopyFail = function (msg) {
// this.stream.write(this.writer.add(emptyBuffer).flush(0x66));
this.writer.addCString(msg)
this._send(0x66)
}
module.exports = Connection

View File

@ -17,8 +17,8 @@ var Result = function (rowMode, types) {
this.rowCount = null
this.oid = null
this.rows = []
this.fields = []
this._parsers = []
this.fields = undefined
this._parsers = undefined
this._types = types
this.RowCtor = null
this.rowAsArray = rowMode === 'array'
@ -53,13 +53,13 @@ Result.prototype.addCommandComplete = function (msg) {
}
Result.prototype._parseRowAsArray = function (rowData) {
var row = []
var row = new Array(rowData.length)
for (var i = 0, len = rowData.length; i < len; i++) {
var rawValue = rowData[i]
if (rawValue !== null) {
row.push(this._parsers[i](rawValue))
row[i] = this._parsers[i](rawValue)
} else {
row.push(null)
row[i] = null
}
}
return row
@ -88,15 +88,17 @@ Result.prototype.addFields = function (fieldDescriptions) {
// multiple query statements in 1 action can result in multiple sets
// of rowDescriptions...eg: 'select NOW(); select 1::int;'
// you need to reset the fields
this.fields = fieldDescriptions
if (this.fields.length) {
this.fields = []
this._parsers = []
this._parsers = new Array(fieldDescriptions.length)
}
for (var i = 0; i < fieldDescriptions.length; i++) {
var desc = fieldDescriptions[i]
this.fields.push(desc)
var parser = (this._types || types).getTypeParser(desc.dataTypeID, desc.format || 'text')
this._parsers.push(parser)
if (this._types) {
this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text')
} else {
this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text')
}
}
}

View File

@ -22,6 +22,7 @@
"buffer-writer": "2.0.0",
"packet-reader": "1.0.0",
"pg-connection-string": "0.1.3",
"pg-packet-stream": "^1.0.0",
"pg-pool": "^2.0.7",
"pg-types": "^2.1.0",
"pgpass": "1.x",

View File

@ -4,6 +4,38 @@ var pg = helper.pg
var suite = new helper.Suite()
suite.test('null and undefined are both inserted as NULL', function (done) {
const pool = new pg.Pool()
pool.connect(
assert.calls(function (err, client, release) {
assert(!err)
client.query(
'CREATE TEMP TABLE my_nulls(a varchar(1), b varchar(1), c integer, d integer, e date, f date)'
)
client.query(
'INSERT INTO my_nulls(a,b,c,d,e,f) VALUES ($1,$2,$3,$4,$5,$6)',
[null, undefined, null, undefined, null, undefined]
)
client.query(
'SELECT * FROM my_nulls',
assert.calls(function (err, result) {
console.log(err)
assert.ifError(err)
assert.equal(result.rows.length, 1)
assert.isNull(result.rows[0].a)
assert.isNull(result.rows[0].b)
assert.isNull(result.rows[0].c)
assert.isNull(result.rows[0].d)
assert.isNull(result.rows[0].e)
assert.isNull(result.rows[0].f)
pool.end(done)
release()
})
)
})
)
})
suite.test('pool callback behavior', done => {
// test weird callback behavior with node-pool
const pool = new pg.Pool()
@ -16,7 +48,7 @@ suite.test('pool callback behavior', done => {
})
suite.test('query timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 1000})
const pool = new pg.Pool({ query_timeout: 1000 })
pool.connect().then((client) => {
client.query('SELECT pg_sleep(2)', assert.calls(function (err, result) {
assert(err)
@ -28,7 +60,7 @@ suite.test('query timeout', (cb) => {
})
suite.test('query recover from timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 1000})
const pool = new pg.Pool({ query_timeout: 1000 })
pool.connect().then((client) => {
client.query('SELECT pg_sleep(20)', assert.calls(function (err, result) {
assert(err)
@ -46,7 +78,7 @@ suite.test('query recover from timeout', (cb) => {
})
suite.test('query no timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 10000})
const pool = new pg.Pool({ query_timeout: 10000 })
pool.connect().then((client) => {
client.query('SELECT pg_sleep(1)', assert.calls(function (err, result) {
assert(!err)
@ -131,18 +163,18 @@ suite.test('raises error if cannot connect', function () {
suite.test('query errors are handled and do not bubble if callback is provided', function (done) {
const pool = new pg.Pool()
pool.connect(
assert.calls(function (err, client, release) {
assert(!err)
client.query(
'SELECT OISDJF FROM LEIWLISEJLSE',
assert.calls(function (err, result) {
assert.ok(err)
release()
pool.end(done)
})
)
})
)
assert.calls(function (err, client, release) {
assert(!err)
client.query(
'SELECT OISDJF FROM LEIWLISEJLSE',
assert.calls(function (err, result) {
assert.ok(err)
release()
pool.end(done)
})
)
})
)
}
)
@ -216,34 +248,3 @@ suite.test('can provide callback and config and parameters', function (done) {
})
)
})
suite.test('null and undefined are both inserted as NULL', function (done) {
const pool = new pg.Pool()
pool.connect(
assert.calls(function (err, client, release) {
assert(!err)
client.query(
'CREATE TEMP TABLE my_nulls(a varchar(1), b varchar(1), c integer, d integer, e date, f date)'
)
client.query(
'INSERT INTO my_nulls(a,b,c,d,e,f) VALUES ($1,$2,$3,$4,$5,$6)',
[null, undefined, null, undefined, null, undefined]
)
client.query(
'SELECT * FROM my_nulls',
assert.calls(function (err, result) {
assert(!err)
assert.equal(result.rows.length, 1)
assert.isNull(result.rows[0].a)
assert.isNull(result.rows[0].b)
assert.isNull(result.rows[0].c)
assert.isNull(result.rows[0].d)
assert.isNull(result.rows[0].e)
assert.isNull(result.rows[0].f)
pool.end(done)
release()
})
)
})
)
})

107
yarn.lock
View File

@ -831,6 +831,11 @@
dependencies:
"@types/node" ">= 8"
"@types/chai@^4.2.7":
version "4.2.7"
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.2.7.tgz#1c8c25cbf6e59ffa7d6b9652c78e547d9a41692d"
integrity sha512-luq8meHGYwvky0O7u0eQZdA7B4Wd9owUCqvbw2m3XCrCU8mplYOujMBbvyS547AxJkC+pGnd0Cm15eNxEUNU8g==
"@types/events@*":
version "3.0.0"
resolved "https://registry.yarnpkg.com/@types/events/-/events-3.0.0.tgz#2862f3f58a9a7f7c3e78d79f130dd4d71c25c2a7"
@ -850,7 +855,12 @@
resolved "https://registry.yarnpkg.com/@types/minimatch/-/minimatch-3.0.3.tgz#3dca0e3f33b200fc7d1139c0cd96c1268cadfd9d"
integrity sha512-tHq6qdbT9U1IRSGf14CL0pUlULksvY9OZ+5eEgl1N7t+OA3tGvNpxJCzuKQlsNgCVwbAs670L1vcVQi8j9HjnA==
"@types/node@*", "@types/node@>= 8":
"@types/mocha@^5.2.7":
version "5.2.7"
resolved "https://registry.yarnpkg.com/@types/mocha/-/mocha-5.2.7.tgz#315d570ccb56c53452ff8638738df60726d5b6ea"
integrity sha512-NYrtPht0wGzhwe9+/idPaBB+TqkY9AhTvOLMkThm0IoEfLaiVQZwBwyJ5puCkO3AUCWrmcoePjp2mbFocKy4SQ==
"@types/node@*", "@types/node@>= 8", "@types/node@^12.12.21":
version "12.12.21"
resolved "https://registry.yarnpkg.com/@types/node/-/node-12.12.21.tgz#aa44a6363291c7037111c47e4661ad210aded23f"
integrity sha512-8sRGhbpU+ck1n0PGAUgVrWrWdjSW2aqNeyC15W88GRsMpSwzv6RJGlLhE7s2RhVSOdyDmxbqlWSeThq4/7xqlA==
@ -993,6 +1003,11 @@ are-we-there-yet@~1.1.2:
delegates "^1.0.0"
readable-stream "^2.0.6"
arg@^4.1.0:
version "4.1.2"
resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.2.tgz#e70c90579e02c63d80e3ad4e31d8bfdb8bd50064"
integrity sha512-+ytCkGcBtHZ3V2r2Z06AncYO8jz46UEamcspGoU8lHcEbpn6J77QK0vdWvChsclg/tM5XIJC5tnjmPp7Eq6Obg==
argparse@^1.0.7:
version "1.0.10"
resolved "https://registry.yarnpkg.com/argparse/-/argparse-1.0.10.tgz#bcd6791ea5ae09725e17e5ad988134cd40b3d911"
@ -1085,6 +1100,11 @@ assert-plus@1.0.0, assert-plus@^1.0.0:
resolved "https://registry.yarnpkg.com/assert-plus/-/assert-plus-1.0.0.tgz#f12e0f3c5d77b0b1cdd9146942e4e96c1e4dd525"
integrity sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=
assertion-error@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/assertion-error/-/assertion-error-1.1.0.tgz#e60b6b0e8f301bd97e5375215bda406c85118c0b"
integrity sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==
assertions@~2.3.0:
version "2.3.4"
resolved "https://registry.yarnpkg.com/assertions/-/assertions-2.3.4.tgz#a9433ced1fce57cc999af0965d1008e96c2796e6"
@ -1348,6 +1368,18 @@ caseless@~0.12.0:
resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc"
integrity sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw=
chai@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/chai/-/chai-4.2.0.tgz#760aa72cf20e3795e84b12877ce0e83737aa29e5"
integrity sha512-XQU3bhBukrOsQCuwZndwGcCVQHyZi53fQ6Ys1Fym7E4olpIqqZZhhoFJoaKVvV17lWQoXYwgWN2nF5crA8J2jw==
dependencies:
assertion-error "^1.1.0"
check-error "^1.0.2"
deep-eql "^3.0.1"
get-func-name "^2.0.0"
pathval "^1.1.0"
type-detect "^4.0.5"
chalk@^2.0.0, chalk@^2.0.1, chalk@^2.1.0, chalk@^2.3.1, chalk@^2.4.2:
version "2.4.2"
resolved "https://registry.yarnpkg.com/chalk/-/chalk-2.4.2.tgz#cd42541677a54333cf541a49108c1432b44c9424"
@ -1362,11 +1394,21 @@ chardet@^0.7.0:
resolved "https://registry.yarnpkg.com/chardet/-/chardet-0.7.0.tgz#90094849f0937f2eedc2425d0d28a9e5f0cbad9e"
integrity sha512-mT8iDcrh03qDGRRmoA2hmBJnxpllMR+0/0qlzjqZES6NdiWDcZkCNAk4rPFZ9Q85r27unkiNNg8ZOiwZXBHwcA==
check-error@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/check-error/-/check-error-1.0.2.tgz#574d312edd88bb5dd8912e9286dd6c0aed4aac82"
integrity sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=
chownr@^1.1.1, chownr@^1.1.2:
version "1.1.3"
resolved "https://registry.yarnpkg.com/chownr/-/chownr-1.1.3.tgz#42d837d5239688d55f303003a508230fa6727142"
integrity sha512-i70fVHhmV3DtTl6nqvZOnIjbY0Pe4kAUjwHj8z0zAdgBtYrJyYwLKCCuRBQ5ppkyL0AkN7HKRnETdmdp1zqNXw==
chunky@^0.0.0:
version "0.0.0"
resolved "https://registry.yarnpkg.com/chunky/-/chunky-0.0.0.tgz#1e7580a23c083897d2ad662459e7efd8465f608a"
integrity sha1-HnWAojwIOJfSrWYkWefv2EZfYIo=
ci-info@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/ci-info/-/ci-info-2.0.0.tgz#67a9e964be31a51e15e5010d58e6f12834002f46"
@ -1755,6 +1797,13 @@ dedent@^0.7.0:
resolved "https://registry.yarnpkg.com/dedent/-/dedent-0.7.0.tgz#2495ddbaf6eb874abb0e1be9df22d2e5a544326c"
integrity sha1-JJXduvbrh0q7Dhvp3yLS5aVEMmw=
deep-eql@^3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/deep-eql/-/deep-eql-3.0.1.tgz#dfc9404400ad1c8fe023e7da1df1c147c4b444df"
integrity sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==
dependencies:
type-detect "^4.0.0"
deep-is@~0.1.3:
version "0.1.3"
resolved "https://registry.yarnpkg.com/deep-is/-/deep-is-0.1.3.tgz#b369d6fb5dbc13eecf524f91b070feedc357cf34"
@ -1829,6 +1878,11 @@ diff@3.5.0:
resolved "https://registry.yarnpkg.com/diff/-/diff-3.5.0.tgz#800c0dd1e0a8bfbc95835c202ad220fe317e5a12"
integrity sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==
diff@^4.0.1:
version "4.0.1"
resolved "https://registry.yarnpkg.com/diff/-/diff-4.0.1.tgz#0c667cb467ebbb5cea7f14f135cc2dba7780a8ff"
integrity sha512-s2+XdvhPCOF01LRQBC8hf4vhbVmI2CGS5aZnxLJlT5FtdhPCDFq80q++zK2KlrVorVDdL5BOGZ/VfLrVtYNF+Q==
dir-glob@^2.2.2:
version "2.2.2"
resolved "https://registry.yarnpkg.com/dir-glob/-/dir-glob-2.2.2.tgz#fa09f0694153c8918b18ba0deafae94769fc50c4"
@ -2472,6 +2526,11 @@ get-caller-file@^2.0.1:
resolved "https://registry.yarnpkg.com/get-caller-file/-/get-caller-file-2.0.5.tgz#4f94412a82db32f36e3b0b9741f8a97feb031f7e"
integrity sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==
get-func-name@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/get-func-name/-/get-func-name-2.0.0.tgz#ead774abee72e20409433a066366023dd6887a41"
integrity sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=
get-pkg-repo@^1.0.0:
version "1.4.0"
resolved "https://registry.yarnpkg.com/get-pkg-repo/-/get-pkg-repo-1.4.0.tgz#c73b489c06d80cc5536c2c853f9e05232056972d"
@ -3454,6 +3513,11 @@ make-dir@^2.1.0:
pify "^4.0.1"
semver "^5.6.0"
make-error@^1.1.1:
version "1.3.5"
resolved "https://registry.yarnpkg.com/make-error/-/make-error-1.3.5.tgz#efe4e81f6db28cadd605c70f29c831b58ef776c8"
integrity sha512-c3sIjNUow0+8swNwVpqoH4YCShKNFkMaw6oH1mNS2haDZQqkeZFlHS3dhoeEbKKmJB4vXpJucU6oH75aDYeE9g==
make-fetch-happen@^5.0.0:
version "5.0.2"
resolved "https://registry.yarnpkg.com/make-fetch-happen/-/make-fetch-happen-5.0.2.tgz#aa8387104f2687edca01c8687ee45013d02d19bd"
@ -4268,6 +4332,11 @@ path-type@^3.0.0:
dependencies:
pify "^3.0.0"
pathval@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/pathval/-/pathval-1.1.0.tgz#b942e6d4bde653005ef6b71361def8727d0645e0"
integrity sha1-uULm1L3mUwBe9rcTYd74cn0GReA=
performance-now@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/performance-now/-/performance-now-2.1.0.tgz#6309f4e0e5fa913ec1c69307ae364b4b377c9e7b"
@ -4959,6 +5028,14 @@ source-map-resolve@^0.5.0:
source-map-url "^0.4.0"
urix "^0.1.0"
source-map-support@^0.5.6:
version "0.5.16"
resolved "https://registry.yarnpkg.com/source-map-support/-/source-map-support-0.5.16.tgz#0ae069e7fe3ba7538c64c98515e35339eac5a042"
integrity sha512-efyLRJDr68D9hBBNIPWFjhpFzURh+KJykQwvMyW5UiZzYwoF6l4YMMDIJJEyFWxWCqfyxLzz6tSfUFR+kXXsVQ==
dependencies:
buffer-from "^1.0.0"
source-map "^0.6.0"
source-map-url@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/source-map-url/-/source-map-url-0.4.0.tgz#3e935d7ddd73631b97659956d55128e87b5084a3"
@ -4969,7 +5046,7 @@ source-map@^0.5.6:
resolved "https://registry.yarnpkg.com/source-map/-/source-map-0.5.7.tgz#8a039d2d1021d22d1ea14c80d8ea468ba2ef3fcc"
integrity sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=
source-map@^0.6.1, source-map@~0.6.1:
source-map@^0.6.0, source-map@^0.6.1, source-map@~0.6.1:
version "0.6.1"
resolved "https://registry.yarnpkg.com/source-map/-/source-map-0.6.1.tgz#74722af32e9614e9c287a8d0bbde48b5e2f1a263"
integrity sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==
@ -5410,6 +5487,17 @@ trim-off-newlines@^1.0.0:
resolved "https://registry.yarnpkg.com/trim-off-newlines/-/trim-off-newlines-1.0.1.tgz#9f9ba9d9efa8764c387698bcbfeb2c848f11adb3"
integrity sha1-n5up2e+odkw4dpi8v+sshI8RrbM=
ts-node@^8.5.4:
version "8.5.4"
resolved "https://registry.yarnpkg.com/ts-node/-/ts-node-8.5.4.tgz#a152add11fa19c221d0b48962c210cf467262ab2"
integrity sha512-izbVCRV68EasEPQ8MSIGBNK9dc/4sYJJKYA+IarMQct1RtEot6Xp0bXuClsbUSnKpg50ho+aOAx8en5c+y4OFw==
dependencies:
arg "^4.1.0"
diff "^4.0.1"
make-error "^1.1.1"
source-map-support "^0.5.6"
yn "^3.0.0"
tslib@^1.9.0:
version "1.10.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.10.0.tgz#c3c19f95973fb0a62973fb09d90d961ee43e5c8a"
@ -5434,6 +5522,11 @@ type-check@~0.3.2:
dependencies:
prelude-ls "~1.1.2"
type-detect@^4.0.0, type-detect@^4.0.5:
version "4.0.8"
resolved "https://registry.yarnpkg.com/type-detect/-/type-detect-4.0.8.tgz#7646fb5f18871cfbb7749e69bd39a6388eb7450c"
integrity sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==
type-fest@^0.3.0:
version "0.3.1"
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.3.1.tgz#63d00d204e059474fe5e1b7c011112bbd1dc29e1"
@ -5449,6 +5542,11 @@ typedarray@^0.0.6:
resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"
integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
typescript@^3.7.3:
version "3.7.3"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.7.3.tgz#b36840668a16458a7025b9eabfad11b66ab85c69"
integrity sha512-Mcr/Qk7hXqFBXMN7p7Lusj1ktCBydylfQM/FZCk5glCNQJrCUKPkMHdo9R0MTFWsC/4kPFvDS0fDPvukfCkFsw==
uglify-js@^3.1.4:
version "3.7.2"
resolved "https://registry.yarnpkg.com/uglify-js/-/uglify-js-3.7.2.tgz#cb1a601e67536e9ed094a92dd1e333459643d3f9"
@ -5772,3 +5870,8 @@ yargs@^14.2.2:
which-module "^2.0.0"
y18n "^4.0.0"
yargs-parser "^15.0.0"
yn@^3.0.0:
version "3.1.1"
resolved "https://registry.yarnpkg.com/yn/-/yn-3.1.1.tgz#1e87401a09d767c1d5eab26a6e4c185182d2eb50"
integrity sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==