import buffers from './testing/test-buffers'; import BufferList from './testing/buffer-list'; import { parse } from '.'; import assert from 'assert'; import { PassThrough } from 'stream'; import { BackendMessage } from './messages'; var authOkBuffer = buffers.authenticationOk(); var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8'); var readyForQueryBuffer = buffers.readyForQuery(); var backendKeyDataBuffer = buffers.backendKeyData(1, 2); var commandCompleteBuffer = buffers.commandComplete('SELECT 3'); var parseCompleteBuffer = buffers.parseComplete(); var bindCompleteBuffer = buffers.bindComplete(); var portalSuspendedBuffer = buffers.portalSuspended(); var addRow = function (bufferList: BufferList, name: string, offset: number) { return bufferList .addCString(name) // field name .addInt32(offset++) // table id .addInt16(offset++) // attribute of column number .addInt32(offset++) // objectId of field's data type .addInt16(offset++) // datatype size .addInt32(offset++) // type modifier .addInt16(0); // format code, 0 => text }; var row1 = { name: 'id', tableID: 1, attributeNumber: 2, dataTypeID: 3, dataTypeSize: 4, typeModifier: 5, formatCode: 0, }; var oneRowDescBuff = buffers.rowDescription([row1]); row1.name = 'bang'; var twoRowBuf = buffers.rowDescription([ row1, { name: 'whoah', tableID: 10, attributeNumber: 11, dataTypeID: 12, dataTypeSize: 13, typeModifier: 14, formatCode: 0, }, ]); var emptyRowFieldBuf = new BufferList().addInt16(0).join(true, 'D'); var emptyRowFieldBuf = buffers.dataRow([]); var oneFieldBuf = new BufferList() .addInt16(1) // number of fields .addInt32(5) // length of bytes of fields .addCString('test') .join(true, 'D'); var oneFieldBuf = buffers.dataRow(['test']); var expectedAuthenticationOkayMessage = { name: 'authenticationOk', length: 8, }; var expectedParameterStatusMessage = { name: 'parameterStatus', parameterName: 'client_encoding', parameterValue: 'UTF8', length: 25, }; var expectedBackendKeyDataMessage = { name: 'backendKeyData', processID: 1, secretKey: 2, }; var expectedReadyForQueryMessage = { name: 'readyForQuery', length: 5, status: 'I', }; var expectedCommandCompleteMessage = { name: 'commandComplete', length: 13, text: 'SELECT 3', }; var emptyRowDescriptionBuffer = new BufferList() .addInt16(0) // number of fields .join(true, 'T'); var expectedEmptyRowDescriptionMessage = { name: 'rowDescription', length: 6, fieldCount: 0, fields: [], }; var expectedOneRowMessage = { name: 'rowDescription', length: 27, fieldCount: 1, fields: [ { name: 'id', tableID: 1, columnID: 2, dataTypeID: 3, dataTypeSize: 4, dataTypeModifier: 5, format: 'text', }, ], }; var expectedTwoRowMessage = { name: 'rowDescription', length: 53, fieldCount: 2, fields: [ { name: 'bang', tableID: 1, columnID: 2, dataTypeID: 3, dataTypeSize: 4, dataTypeModifier: 5, format: 'text', }, { name: 'whoah', tableID: 10, columnID: 11, dataTypeID: 12, dataTypeSize: 13, dataTypeModifier: 14, format: 'text', }, ], }; var testForMessage = function (buffer: Buffer, expectedMessage: any) { it('recieves and parses ' + expectedMessage.name, async () => { const messages = await parseBuffers([buffer]); const [lastMessage] = messages; for (const key in expectedMessage) { assert.deepEqual((lastMessage as any)[key], expectedMessage[key]); } }); }; var plainPasswordBuffer = buffers.authenticationCleartextPassword(); var md5PasswordBuffer = buffers.authenticationMD5Password(); var SASLBuffer = buffers.authenticationSASL(); var SASLContinueBuffer = buffers.authenticationSASLContinue(); var SASLFinalBuffer = buffers.authenticationSASLFinal(); var expectedPlainPasswordMessage = { name: 'authenticationCleartextPassword', }; var expectedMD5PasswordMessage = { name: 'authenticationMD5Password', salt: Buffer.from([1, 2, 3, 4]), }; var expectedSASLMessage = { name: 'authenticationSASL', mechanisms: ['SCRAM-SHA-256'], }; var expectedSASLContinueMessage = { name: 'authenticationSASLContinue', data: 'data', }; var expectedSASLFinalMessage = { name: 'authenticationSASLFinal', data: 'data', }; var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom'); var expectedNotificationResponseMessage = { name: 'notification', processId: 4, channel: 'hi', payload: 'boom', }; const parseBuffers = async (buffers: Buffer[]): Promise => { const stream = new PassThrough(); for (const buffer of buffers) { stream.write(buffer); } stream.end(); const msgs: BackendMessage[] = []; await parse(stream, (msg) => msgs.push(msg)); return msgs; }; describe('PgPacketStream', function () { testForMessage(authOkBuffer, expectedAuthenticationOkayMessage); testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage); testForMessage(md5PasswordBuffer, expectedMD5PasswordMessage); testForMessage(SASLBuffer, expectedSASLMessage); testForMessage(SASLContinueBuffer, expectedSASLContinueMessage); testForMessage(SASLFinalBuffer, expectedSASLFinalMessage); testForMessage(paramStatusBuffer, expectedParameterStatusMessage); testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage); testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage); testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage); testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage); testForMessage(buffers.emptyQuery(), { name: 'emptyQuery', length: 4, }); testForMessage(Buffer.from([0x6e, 0, 0, 0, 4]), { name: 'noData', }); describe('rowDescription messages', function () { testForMessage(emptyRowDescriptionBuffer, expectedEmptyRowDescriptionMessage); testForMessage(oneRowDescBuff, expectedOneRowMessage); testForMessage(twoRowBuf, expectedTwoRowMessage); }); describe('parsing rows', function () { describe('parsing empty row', function () { testForMessage(emptyRowFieldBuf, { name: 'dataRow', fieldCount: 0, }); }); describe('parsing data row with fields', function () { testForMessage(oneFieldBuf, { name: 'dataRow', fieldCount: 1, fields: ['test'], }); }); }); describe('notice message', function () { // this uses the same logic as error message var buff = buffers.notice([{ type: 'C', value: 'code' }]); testForMessage(buff, { name: 'notice', code: 'code', }); }); testForMessage(buffers.error([]), { name: 'error', }); describe('with all the fields', function () { var buffer = buffers.error([ { type: 'S', value: 'ERROR', }, { type: 'C', value: 'code', }, { type: 'M', value: 'message', }, { type: 'D', value: 'details', }, { type: 'H', value: 'hint', }, { type: 'P', value: '100', }, { type: 'p', value: '101', }, { type: 'q', value: 'query', }, { type: 'W', value: 'where', }, { type: 'F', value: 'file', }, { type: 'L', value: 'line', }, { type: 'R', value: 'routine', }, { type: 'Z', // ignored value: 'alsdkf', }, ]); testForMessage(buffer, { name: 'error', severity: 'ERROR', code: 'code', message: 'message', detail: 'details', hint: 'hint', position: '100', internalPosition: '101', internalQuery: 'query', where: 'where', file: 'file', line: 'line', routine: 'routine', }); }); testForMessage(parseCompleteBuffer, { name: 'parseComplete', }); testForMessage(bindCompleteBuffer, { name: 'bindComplete', }); testForMessage(bindCompleteBuffer, { name: 'bindComplete', }); testForMessage(buffers.closeComplete(), { name: 'closeComplete', }); describe('parses portal suspended message', function () { testForMessage(portalSuspendedBuffer, { name: 'portalSuspended', }); }); describe('parses replication start message', function () { testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), { name: 'replicationStart', length: 4, }); }); describe('copy', () => { testForMessage(buffers.copyIn(0), { name: 'copyInResponse', length: 7, binary: false, columnTypes: [], }); testForMessage(buffers.copyIn(2), { name: 'copyInResponse', length: 11, binary: false, columnTypes: [0, 1], }); testForMessage(buffers.copyOut(0), { name: 'copyOutResponse', length: 7, binary: false, columnTypes: [], }); testForMessage(buffers.copyOut(3), { name: 'copyOutResponse', length: 13, binary: false, columnTypes: [0, 1, 2], }); testForMessage(buffers.copyDone(), { name: 'copyDone', length: 4, }); testForMessage(buffers.copyData(Buffer.from([5, 6, 7])), { name: 'copyData', length: 7, chunk: Buffer.from([5, 6, 7]), }); }); // since the data message on a stream can randomly divide the incomming // tcp packets anywhere, we need to make sure we can parse every single // split on a tcp message describe('split buffer, single message parsing', function () { var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!']); it('parses when full buffer comes in', async function () { const messages = await parseBuffers([fullBuffer]); const message = messages[0] as any; assert.equal(message.fields.length, 5); assert.equal(message.fields[0], null); assert.equal(message.fields[1], 'bang'); assert.equal(message.fields[2], 'zug zug'); assert.equal(message.fields[3], null); assert.equal(message.fields[4], '!'); }); var testMessageRecievedAfterSpiltAt = async function (split: number) { var firstBuffer = Buffer.alloc(fullBuffer.length - split); var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length); fullBuffer.copy(firstBuffer, 0, 0); fullBuffer.copy(secondBuffer, 0, firstBuffer.length); const messages = await parseBuffers([fullBuffer]); const message = messages[0] as any; assert.equal(message.fields.length, 5); assert.equal(message.fields[0], null); assert.equal(message.fields[1], 'bang'); assert.equal(message.fields[2], 'zug zug'); assert.equal(message.fields[3], null); assert.equal(message.fields[4], '!'); }; it('parses when split in the middle', function () { testMessageRecievedAfterSpiltAt(6); }); it('parses when split at end', function () { testMessageRecievedAfterSpiltAt(2); }); it('parses when split at beginning', function () { testMessageRecievedAfterSpiltAt(fullBuffer.length - 2); testMessageRecievedAfterSpiltAt(fullBuffer.length - 1); testMessageRecievedAfterSpiltAt(fullBuffer.length - 5); }); }); describe('split buffer, multiple message parsing', function () { var dataRowBuffer = buffers.dataRow(['!']); var readyForQueryBuffer = buffers.readyForQuery(); var fullBuffer = Buffer.alloc(dataRowBuffer.length + readyForQueryBuffer.length); dataRowBuffer.copy(fullBuffer, 0, 0); readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0); var verifyMessages = function (messages: any[]) { assert.strictEqual(messages.length, 2); assert.deepEqual(messages[0], { name: 'dataRow', fieldCount: 1, length: 11, fields: ['!'], }); assert.equal(messages[0].fields[0], '!'); assert.deepEqual(messages[1], { name: 'readyForQuery', length: 5, status: 'I', }); }; // sanity check it('recieves both messages when packet is not split', async function () { const messages = await parseBuffers([fullBuffer]); verifyMessages(messages); }); var splitAndVerifyTwoMessages = async function (split: number) { var firstBuffer = Buffer.alloc(fullBuffer.length - split); var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length); fullBuffer.copy(firstBuffer, 0, 0); fullBuffer.copy(secondBuffer, 0, firstBuffer.length); const messages = await parseBuffers([firstBuffer, secondBuffer]); verifyMessages(messages); }; describe('recieves both messages when packet is split', function () { it('in the middle', function () { return splitAndVerifyTwoMessages(11); }); it('at the front', function () { return Promise.all([ splitAndVerifyTwoMessages(fullBuffer.length - 1), splitAndVerifyTwoMessages(fullBuffer.length - 4), splitAndVerifyTwoMessages(fullBuffer.length - 6), ]); }); it('at the end', function () { return Promise.all([splitAndVerifyTwoMessages(8), splitAndVerifyTwoMessages(1)]); }); }); }); });