Update types & move some configs around

This commit is contained in:
Brian M. Carlson 2019-12-27 02:55:18 +00:00
parent e034010811
commit 766e48f34a
7 changed files with 104 additions and 153 deletions

View File

@ -32,7 +32,7 @@ RUN apt-get update \
&& curl -sS https://dl.yarnpkg.com/$(lsb_release -is | tr '[:upper:]' '[:lower:]')/pubkey.gpg | apt-key add - 2>/dev/null \
&& echo "deb https://dl.yarnpkg.com/$(lsb_release -is | tr '[:upper:]' '[:lower:]')/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update \
&& apt-get -y install --no-install-recommends yarn tmux locales \
&& apt-get -y install --no-install-recommends yarn tmux locales postgresql \
#
# Install eslint globally
&& npm install -g eslint \

View File

@ -1,8 +1,18 @@
{
"plugins": ["node"],
"extends": ["standard", "eslint:recommended", "plugin:node/recommended"],
"plugins": [
"node"
],
"extends": [
"standard",
"eslint:recommended",
"plugin:node/recommended"
],
"ignorePatterns": [
"**/*.ts"
],
"parserOptions": {
"ecmaVersion": 2017
"ecmaVersion": 2017,
"sourceType": "module"
},
"env": {
"node": true,
@ -11,10 +21,13 @@
},
"rules": {
"space-before-function-paren": "off",
"node/no-unsupported-features/es-syntax": "off",
"node/no-unpublished-require": [
"error",
{
"allowModules": ["pg"]
"allowModules": [
"pg"
]
}
]
}

View File

@ -2,21 +2,20 @@
"name": "pg-packet-stream",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"devDependencies": {
"@types/node": "^12.12.21",
"chunky": "^0.0.0",
"mocha": "^6.2.2",
"typescript": "^3.7.3"
},
"scripts": {
"test": "mocha -r ts-node/register src/**/*.test.ts"
},
"dependencies": {
"typescript": "^3.7.3",
"@types/chai": "^4.2.7",
"@types/mocha": "^5.2.7",
"chai": "^4.2.0",
"mocha": "^6.2.2",
"ts-node": "^8.5.4"
}
},
"scripts": {
"test": "mocha -r ts-node/register src/**/*.test.ts"
},
"dependencies": {}
}

View File

@ -1,103 +0,0 @@
import 'mocha';
import { PgPacketStream, Packet } from './'
import { expect } from 'chai'
import chunky from 'chunky'
const consume = async (stream: PgPacketStream, count: number): Promise<Packet[]> => {
const result: Packet[] = [];
return new Promise((resolve) => {
const read = () => {
stream.once('readable', () => {
let packet;
while (packet = stream.read()) {
result.push(packet)
}
if (result.length === count) {
resolve(result);
} else {
read()
}
})
}
read()
})
}
const emptyMessage = Buffer.from([0x0a, 0x00, 0x00, 0x00, 0x04])
const oneByteMessage = Buffer.from([0x0b, 0x00, 0x00, 0x00, 0x05, 0x0a])
const bigMessage = Buffer.from([0x0f, 0x00, 0x00, 0x00, 0x14, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e0, 0x0f])
describe.skip('PgPacketStream', () => {
it('should chunk a perfect input packet', async () => {
const stream = new PgPacketStream()
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x04]))
stream.end()
const buffers = await consume(stream, 1)
expect(buffers).to.have.length(1)
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
});
it('should read 2 chunks into perfect input packet', async () => {
const stream = new PgPacketStream()
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x08]))
stream.write(Buffer.from([0x1, 0x2, 0x3, 0x4]))
stream.end()
const buffers = await consume(stream, 1)
expect(buffers).to.have.length(1)
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x08, 0x1, 0x2, 0x3, 0x4]))
});
it('should read a bunch of big messages', async () => {
const stream = new PgPacketStream();
let totalBuffer = Buffer.allocUnsafe(0);
const num = 2;
for (let i = 0; i < 2; i++) {
totalBuffer = Buffer.concat([totalBuffer, bigMessage, bigMessage])
}
const chunks = chunky(totalBuffer)
for (const chunk of chunks) {
stream.write(chunk)
}
stream.end()
const messages = await consume(stream, num * 2)
expect(messages.map(x => x.code)).to.eql(new Array(num * 2).fill(0x0f))
})
it('should read multiple messages in a single chunk', async () => {
const stream = new PgPacketStream()
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x04]))
stream.end()
const buffers = await consume(stream, 2)
expect(buffers).to.have.length(2)
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
expect(buffers[1].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
});
it('should read multiple chunks into multiple packets', async () => {
const stream = new PgPacketStream()
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x05, 0x0a, 0x01, 0x00, 0x00, 0x00, 0x05, 0x0b]))
stream.write(Buffer.from([0x01, 0x00, 0x00]));
stream.write(Buffer.from([0x00, 0x06, 0x0c, 0x0d, 0x03, 0x00, 0x00, 0x00, 0x04]))
stream.end()
const buffers = await consume(stream, 4)
expect(buffers).to.have.length(4)
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x05, 0x0a]))
expect(buffers[1].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x05, 0x0b]))
expect(buffers[2].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x06, 0x0c, 0x0d]))
expect(buffers[3].packet).to.deep.equal(Buffer.from([0x3, 0x00, 0x00, 0x00, 0x04]))
});
it('reads packet that spans multiple chunks', async () => {
const stream = new PgPacketStream()
stream.write(Buffer.from([0x0d, 0x00, 0x00, 0x00]))
stream.write(Buffer.from([0x09])) // length
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
stream.end()
const buffers = await consume(stream, 1)
expect(buffers).to.have.length(1)
})
});

View File

@ -1,5 +1,5 @@
import { Transform, TransformCallback, TransformOptions } from 'stream';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage } from './messages';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password } from './messages';
import { BufferReader } from './BufferReader';
import assert from 'assert'
@ -63,7 +63,12 @@ export class PgPacketStream extends Transform {
}
public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) {
const combinedBuffer: Buffer = this.remainingBuffer.byteLength ? Buffer.concat([this.remainingBuffer, buffer], this.remainingBuffer.length + buffer.length) : buffer;
let combinedBuffer = buffer;
if (this.remainingBuffer.byteLength) {
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength);
this.remainingBuffer.copy(combinedBuffer)
buffer.copy(combinedBuffer, this.remainingBuffer.byteLength)
}
let offset = 0;
while ((offset + HEADER_LENGTH) <= combinedBuffer.byteLength) {
// code is 1 byte long - it identifies the message type
@ -125,9 +130,9 @@ export class PgPacketStream extends Transform {
case MessageCodes.BackendKeyData:
return this.parseBackendKeyData(offset, length, bytes);
case MessageCodes.ErrorMessage:
return this.parseErrorMessage(offset, length, bytes, 'error');
return this.parseErrorMessage(offset, length, bytes, MessageName.error);
case MessageCodes.NoticeMessage:
return this.parseErrorMessage(offset, length, bytes, 'notice');
return this.parseErrorMessage(offset, length, bytes, MessageName.notice);
case MessageCodes.RowDescriptionMessage:
return this.parseRowDescriptionMessage(offset, length, bytes);
case MessageCodes.CopyIn:
@ -142,7 +147,7 @@ export class PgPacketStream extends Transform {
}
public _flush(callback: TransformCallback) {
this._transform(Buffer.alloc(0), 'utf-i', callback)
this._transform(Buffer.alloc(0), 'utf-8', callback)
}
private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
@ -163,14 +168,14 @@ export class PgPacketStream extends Transform {
}
private parseCopyInMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, 'copyInResponse')
return this.parseCopyMessage(offset, length, bytes, MessageName.copyInResponse)
}
private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse')
return this.parseCopyMessage(offset, length, bytes, MessageName.copyOutResponse)
}
private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: string) {
private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) {
this.reader.setBuffer(offset, bytes);
const isBinary = this.reader.byte() !== 0;
const columnCount = this.reader.int16()
@ -244,8 +249,8 @@ export class PgPacketStream extends Transform {
this.reader.setBuffer(offset, bytes);
const code = this.reader.int32()
// TODO(bmc): maybe better types here
const message: any = {
name: 'authenticationOk',
const message: BackendMessage & any = {
name: MessageName.authenticationOk,
length,
};
@ -254,17 +259,18 @@ export class PgPacketStream extends Transform {
break;
case 3: // AuthenticationCleartextPassword
if (message.length === 8) {
message.name = 'authenticationCleartextPassword'
message.name = MessageName.authenticationCleartextPassword
}
break
case 5: // AuthenticationMD5Password
if (message.length === 12) {
message.name = 'authenticationMD5Password'
message.salt = this.reader.bytes(4);
message.name = MessageName.authenticationMD5Password
const salt = this.reader.bytes(4);
return new AuthenticationMD5Password(length, salt);
}
break
case 10: // AuthenticationSASL
message.name = 'authenticationSASL'
message.name = MessageName.authenticationSASL
message.mechanisms = []
let mechanism: string;
do {
@ -276,11 +282,11 @@ export class PgPacketStream extends Transform {
} while (mechanism)
break;
case 11: // AuthenticationSASLContinue
message.name = 'authenticationSASLContinue'
message.name = MessageName.authenticationSASLContinue
message.data = this.reader.string(length - 4)
break;
case 12: // AuthenticationSASLFinal
message.name = 'authenticationSASLFinal'
message.name = MessageName.authenticationSASLFinal
message.data = this.reader.string(length - 4)
break;
default:
@ -289,7 +295,7 @@ export class PgPacketStream extends Transform {
return message;
}
private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: string) {
private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) {
this.reader.setBuffer(offset, bytes);
var fields: Record<string, string> = {}
var fieldType = this.reader.string(1)

View File

@ -1,47 +1,76 @@
export type Mode = 'text' | 'binary';
export type BackendMessage = {
name: string;
export const enum MessageName {
parseComplete = 'parseComplete',
bindComplete = 'bindComplete',
closeComplete = 'closeComplete',
noData = 'noData',
portalSuspended = 'portalSuspended',
replicationStart = 'replicationStart',
emptyQuery = 'emptyQuery',
copyDone = 'copyDone',
copyData = 'copyData',
rowDescription = 'rowDescription',
parameterStatus = 'parameterStatus',
backendKeyData = 'backendKeyData',
notification = 'notification',
readyForQuery = 'readyForQuery',
commandComplete = 'commandComplete',
dataRow = 'dataRow',
copyInResponse = 'copyInResponse',
copyOutResponse = 'copyOutResponse',
authenticationOk = 'authenticationOk',
authenticationMD5Password = 'authenticationMD5Password',
authenticationCleartextPassword = 'authenticationCleartextPassword',
authenticationSASL = 'authenticationSASL',
authenticationSASLContinue = 'authenticationSASLContinue',
authenticationSASLFinal = 'authenticationSASLFinal',
error = 'error',
notice = 'notice',
}
export interface BackendMessage {
name: MessageName;
length: number;
}
export const parseComplete: BackendMessage = {
name: 'parseComplete',
name: MessageName.parseComplete,
length: 5,
};
export const bindComplete: BackendMessage = {
name: 'bindComplete',
name: MessageName.bindComplete,
length: 5,
}
export const closeComplete: BackendMessage = {
name: 'closeComplete',
name: MessageName.closeComplete,
length: 5,
}
export const noData: BackendMessage = {
name: 'noData',
name: MessageName.noData,
length: 5
}
export const portalSuspended: BackendMessage = {
name: 'portalSuspended',
name: MessageName.portalSuspended,
length: 5,
}
export const replicationStart: BackendMessage = {
name: 'replicationStart',
name: MessageName.replicationStart,
length: 4,
}
export const emptyQuery: BackendMessage = {
name: 'emptyQuery',
name: MessageName.emptyQuery,
length: 4,
}
export const copyDone: BackendMessage = {
name: 'copyDone',
name: MessageName.copyDone,
length: 4,
}
@ -62,13 +91,13 @@ export class DatabaseError extends Error {
public file: string | undefined;
public line: string | undefined;
public routine: string | undefined;
constructor(message: string, public readonly length: number, public readonly name: string) {
constructor(message: string, public readonly length: number, public readonly name: MessageName) {
super(message)
}
}
export class CopyDataMessage {
public readonly name = 'copyData';
public readonly name = MessageName.copyData;
constructor(public readonly length: number, public readonly chunk: Buffer) {
}
@ -76,7 +105,7 @@ export class CopyDataMessage {
export class CopyResponse {
public readonly columnTypes: number[];
constructor(public readonly length: number, public readonly name: string, public readonly binary: boolean, columnCount: number) {
constructor(public readonly length: number, public readonly name: MessageName, public readonly binary: boolean, columnCount: number) {
this.columnTypes = new Array(columnCount);
}
}
@ -87,7 +116,7 @@ export class Field {
}
export class RowDescriptionMessage {
public readonly name: string = 'rowDescription';
public readonly name: MessageName = MessageName.rowDescription;
public readonly fields: Field[];
constructor(public readonly length: number, public readonly fieldCount: number) {
this.fields = new Array(this.fieldCount)
@ -95,39 +124,45 @@ export class RowDescriptionMessage {
}
export class ParameterStatusMessage {
public readonly name: string = 'parameterStatus';
public readonly name: MessageName = MessageName.parameterStatus;
constructor(public readonly length: number, public readonly parameterName: string, public readonly parameterValue: string) {
}
}
export class AuthenticationMD5Password implements BackendMessage {
public readonly name: MessageName = MessageName.authenticationMD5Password;
constructor(public readonly length: number, public readonly salt: Buffer) {
}
}
export class BackendKeyDataMessage {
public readonly name: string = 'backendKeyData';
public readonly name: MessageName = MessageName.backendKeyData;
constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) {
}
}
export class NotificationResponseMessage {
public readonly name: string = 'notification';
public readonly name: MessageName = MessageName.notification;
constructor(public readonly length: number, public readonly processId: number, public readonly channel: string, public readonly payload: string) {
}
}
export class ReadyForQueryMessage {
public readonly name: string = 'readyForQuery';
public readonly name: MessageName = MessageName.readyForQuery;
constructor(public readonly length: number, public readonly status: string) {
}
}
export class CommandCompleteMessage {
public readonly name: string = 'commandComplete'
public readonly name: MessageName = MessageName.commandComplete
constructor(public readonly length: number, public readonly text: string) {
}
}
export class DataRowMessage {
public readonly fieldCount: number;
public readonly name: string = 'dataRow'
public readonly name: MessageName = MessageName.dataRow
constructor(public length: number, public fields: any[]) {
this.fieldCount = fields.length;
}

View File

@ -10,6 +10,7 @@
"sourceMap": true,
"outDir": "dist",
"baseUrl": ".",
"declaration": true,
"paths": {
"*": [
"node_modules/*",