diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index a00dabec..1827c3d1 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,10 +73,18 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void +interface CombinedBuffer { + combinedBuffer: Buffer + combinedBufferOffset: number + combinedBufferLength: number + combinedBufferFullLength: number + reuseRemainingBuffer: boolean +} + export class Parser { - private buffer: Buffer = emptyBuffer - private bufferLength: number = 0 - private bufferOffset: number = 0 + private remainingBuffer: Buffer = emptyBuffer + private remainingBufferLength: number = 0 + private remainingBufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -88,65 +96,111 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - this.mergeBuffer(buffer) - const bufferFullLength = this.bufferOffset + this.bufferLength - let offset = this.bufferOffset - while (offset + HEADER_LENGTH <= bufferFullLength) { + const { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + } = this.mergeBuffer(buffer) + let offset = combinedBufferOffset + while (offset + HEADER_LENGTH <= combinedBufferFullLength) { // code is 1 byte long - it identifies the message type - const code = this.buffer[offset] + const code = combinedBuffer[offset] + // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) + const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) + const fullMessageLength = CODE_LENGTH + length - if (fullMessageLength + offset <= bufferFullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) + + if (fullMessageLength + offset <= combinedBufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) callback(message) offset += fullMessageLength } else { break } } - if (offset === bufferFullLength) { - // No more use for the buffer - this.buffer = emptyBuffer - this.bufferLength = 0 - this.bufferOffset = 0 - } else { - // Adjust the cursors of remainingBuffer - this.bufferLength = bufferFullLength - offset - this.bufferOffset = offset - } + this.consumeBuffer({ + combinedBuffer, + combinedBufferOffset: offset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + }) } - private mergeBuffer(buffer: Buffer): void { - if (this.bufferLength > 0) { - const newLength = this.bufferLength + buffer.byteLength - const newFullLength = newLength + this.bufferOffset - if (newFullLength > this.buffer.byteLength) { + private mergeBuffer(buffer: Buffer): CombinedBuffer { + let combinedBuffer = buffer + let combinedBufferLength = buffer.byteLength + let combinedBufferOffset = 0 + let reuseRemainingBuffer = this.remainingBufferLength > 0 + if (reuseRemainingBuffer) { + const newLength = this.remainingBufferLength + combinedBufferLength + const newFullLength = newLength + this.remainingBufferOffset + if (newFullLength > this.remainingBuffer.byteLength) { // We can't concat the new buffer with the remaining one let newBuffer: Buffer - if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { + if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer - newBuffer = this.buffer + newBuffer = this.remainingBuffer } else { // Allocate a new larger buffer - let newBufferLength = this.buffer.byteLength * 2 + let newBufferLength = this.remainingBuffer.byteLength * 2 while (newLength >= newBufferLength) { newBufferLength *= 2 } newBuffer = Buffer.allocUnsafe(newBufferLength) } // Move the remaining buffer to the new one - this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) - this.buffer = newBuffer - this.bufferOffset = 0 + this.remainingBuffer.copy( + newBuffer, + 0, + this.remainingBufferOffset, + this.remainingBufferOffset + this.remainingBufferLength + ) + this.remainingBuffer = newBuffer + this.remainingBufferOffset = 0 } // Concat the new buffer with the remaining one - buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) - this.bufferLength = newLength + buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) + combinedBuffer = this.remainingBuffer + combinedBufferLength = this.remainingBufferLength = newLength + combinedBufferOffset = this.remainingBufferOffset + } + const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength + return { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + } + } + + private consumeBuffer({ + combinedBufferOffset, + combinedBufferFullLength, + reuseRemainingBuffer, + combinedBuffer, + combinedBufferLength, + }: CombinedBuffer) { + if (combinedBufferOffset === combinedBufferFullLength) { + // No more use for the buffer + this.remainingBuffer = emptyBuffer + this.remainingBufferLength = 0 + this.remainingBufferOffset = 0 } else { - this.buffer = buffer - this.bufferOffset = 0 - this.bufferLength = buffer.byteLength + this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset + if (reuseRemainingBuffer) { + // Adjust the cursors of remainingBuffer + this.remainingBufferOffset = combinedBufferOffset + } else { + // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer + this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) + combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) + this.remainingBufferOffset = 0 + } } } diff --git a/packages/pg/bench.js b/packages/pg/bench.js index c861c3ae..80c07dc1 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,4 +1,5 @@ const pg = require('./lib') +const pool = new pg.Pool() const params = { text: @@ -16,7 +17,7 @@ const seq = { } const exec = async (client, q) => { - await client.query({ + const result = await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -39,7 +40,6 @@ const run = async () => { const client = new pg.Client() await client.connect() await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') - await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,21 +61,7 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - - console.log('') - console.log('Warming up bytea test') - await client.query({ - text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', - values: ['test', Buffer.allocUnsafe(104857600)], - }) - console.log('bytea warmup done') - const start = Date.now() - const results = await client.query('SELECT * FROM buf') - const time = Date.now() - start - console.log('bytea time:', time, 'ms') - console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') - console.log('on my laptop best so far seen 1107ms and 104857600 bytes') - + console.log() await client.end() await client.end() }