mirror of
https://github.com/brianc/node-postgres.git
synced 2026-01-18 15:55:05 +00:00
fix: major performance issues with bytea performance #2240
This commit is contained in:
parent
64c78b0b0e
commit
bf53552a15
@ -73,10 +73,18 @@ const enum MessageCodes {
|
||||
|
||||
export type MessageCallback = (msg: BackendMessage) => void
|
||||
|
||||
interface CombinedBuffer {
|
||||
combinedBuffer: Buffer
|
||||
combinedBufferOffset: number
|
||||
combinedBufferLength: number
|
||||
combinedBufferFullLength: number
|
||||
reuseRemainingBuffer: boolean
|
||||
}
|
||||
|
||||
export class Parser {
|
||||
private buffer: Buffer = emptyBuffer
|
||||
private bufferLength: number = 0
|
||||
private bufferOffset: number = 0
|
||||
private remainingBuffer: Buffer = emptyBuffer
|
||||
private remainingBufferLength: number = 0
|
||||
private remainingBufferOffset: number = 0
|
||||
private reader = new BufferReader()
|
||||
private mode: Mode
|
||||
|
||||
@ -88,65 +96,111 @@ export class Parser {
|
||||
}
|
||||
|
||||
public parse(buffer: Buffer, callback: MessageCallback) {
|
||||
this.mergeBuffer(buffer)
|
||||
const bufferFullLength = this.bufferOffset + this.bufferLength
|
||||
let offset = this.bufferOffset
|
||||
while (offset + HEADER_LENGTH <= bufferFullLength) {
|
||||
const {
|
||||
combinedBuffer,
|
||||
combinedBufferOffset,
|
||||
combinedBufferLength,
|
||||
reuseRemainingBuffer,
|
||||
combinedBufferFullLength,
|
||||
} = this.mergeBuffer(buffer)
|
||||
let offset = combinedBufferOffset
|
||||
while (offset + HEADER_LENGTH <= combinedBufferFullLength) {
|
||||
// code is 1 byte long - it identifies the message type
|
||||
const code = this.buffer[offset]
|
||||
const code = combinedBuffer[offset]
|
||||
|
||||
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
|
||||
const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
|
||||
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH)
|
||||
|
||||
const fullMessageLength = CODE_LENGTH + length
|
||||
if (fullMessageLength + offset <= bufferFullLength) {
|
||||
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer)
|
||||
|
||||
if (fullMessageLength + offset <= combinedBufferFullLength) {
|
||||
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer)
|
||||
callback(message)
|
||||
offset += fullMessageLength
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if (offset === bufferFullLength) {
|
||||
// No more use for the buffer
|
||||
this.buffer = emptyBuffer
|
||||
this.bufferLength = 0
|
||||
this.bufferOffset = 0
|
||||
} else {
|
||||
// Adjust the cursors of remainingBuffer
|
||||
this.bufferLength = bufferFullLength - offset
|
||||
this.bufferOffset = offset
|
||||
}
|
||||
this.consumeBuffer({
|
||||
combinedBuffer,
|
||||
combinedBufferOffset: offset,
|
||||
combinedBufferLength,
|
||||
reuseRemainingBuffer,
|
||||
combinedBufferFullLength,
|
||||
})
|
||||
}
|
||||
|
||||
private mergeBuffer(buffer: Buffer): void {
|
||||
if (this.bufferLength > 0) {
|
||||
const newLength = this.bufferLength + buffer.byteLength
|
||||
const newFullLength = newLength + this.bufferOffset
|
||||
if (newFullLength > this.buffer.byteLength) {
|
||||
private mergeBuffer(buffer: Buffer): CombinedBuffer {
|
||||
let combinedBuffer = buffer
|
||||
let combinedBufferLength = buffer.byteLength
|
||||
let combinedBufferOffset = 0
|
||||
let reuseRemainingBuffer = this.remainingBufferLength > 0
|
||||
if (reuseRemainingBuffer) {
|
||||
const newLength = this.remainingBufferLength + combinedBufferLength
|
||||
const newFullLength = newLength + this.remainingBufferOffset
|
||||
if (newFullLength > this.remainingBuffer.byteLength) {
|
||||
// We can't concat the new buffer with the remaining one
|
||||
let newBuffer: Buffer
|
||||
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
|
||||
if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) {
|
||||
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
|
||||
newBuffer = this.buffer
|
||||
newBuffer = this.remainingBuffer
|
||||
} else {
|
||||
// Allocate a new larger buffer
|
||||
let newBufferLength = this.buffer.byteLength * 2
|
||||
let newBufferLength = this.remainingBuffer.byteLength * 2
|
||||
while (newLength >= newBufferLength) {
|
||||
newBufferLength *= 2
|
||||
}
|
||||
newBuffer = Buffer.allocUnsafe(newBufferLength)
|
||||
}
|
||||
// Move the remaining buffer to the new one
|
||||
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
|
||||
this.buffer = newBuffer
|
||||
this.bufferOffset = 0
|
||||
this.remainingBuffer.copy(
|
||||
newBuffer,
|
||||
0,
|
||||
this.remainingBufferOffset,
|
||||
this.remainingBufferOffset + this.remainingBufferLength
|
||||
)
|
||||
this.remainingBuffer = newBuffer
|
||||
this.remainingBufferOffset = 0
|
||||
}
|
||||
// Concat the new buffer with the remaining one
|
||||
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
|
||||
this.bufferLength = newLength
|
||||
buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength)
|
||||
combinedBuffer = this.remainingBuffer
|
||||
combinedBufferLength = this.remainingBufferLength = newLength
|
||||
combinedBufferOffset = this.remainingBufferOffset
|
||||
}
|
||||
const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength
|
||||
return {
|
||||
combinedBuffer,
|
||||
combinedBufferOffset,
|
||||
combinedBufferLength,
|
||||
reuseRemainingBuffer,
|
||||
combinedBufferFullLength,
|
||||
}
|
||||
}
|
||||
|
||||
private consumeBuffer({
|
||||
combinedBufferOffset,
|
||||
combinedBufferFullLength,
|
||||
reuseRemainingBuffer,
|
||||
combinedBuffer,
|
||||
combinedBufferLength,
|
||||
}: CombinedBuffer) {
|
||||
if (combinedBufferOffset === combinedBufferFullLength) {
|
||||
// No more use for the buffer
|
||||
this.remainingBuffer = emptyBuffer
|
||||
this.remainingBufferLength = 0
|
||||
this.remainingBufferOffset = 0
|
||||
} else {
|
||||
this.buffer = buffer
|
||||
this.bufferOffset = 0
|
||||
this.bufferLength = buffer.byteLength
|
||||
this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset
|
||||
if (reuseRemainingBuffer) {
|
||||
// Adjust the cursors of remainingBuffer
|
||||
this.remainingBufferOffset = combinedBufferOffset
|
||||
} else {
|
||||
// To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer
|
||||
this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2)
|
||||
combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset)
|
||||
this.remainingBufferOffset = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
const pg = require('./lib')
|
||||
const pool = new pg.Pool()
|
||||
|
||||
const params = {
|
||||
text:
|
||||
@ -16,7 +17,7 @@ const seq = {
|
||||
}
|
||||
|
||||
const exec = async (client, q) => {
|
||||
await client.query({
|
||||
const result = await client.query({
|
||||
text: q.text,
|
||||
values: q.values,
|
||||
rowMode: 'array',
|
||||
@ -39,7 +40,6 @@ const run = async () => {
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
|
||||
await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)')
|
||||
await bench(client, params, 1000)
|
||||
console.log('warmup done')
|
||||
const seconds = 5
|
||||
@ -61,21 +61,7 @@ const run = async () => {
|
||||
console.log('insert queries:', queries)
|
||||
console.log('qps', queries / seconds)
|
||||
console.log('on my laptop best so far seen 5799 qps')
|
||||
|
||||
console.log('')
|
||||
console.log('Warming up bytea test')
|
||||
await client.query({
|
||||
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
|
||||
values: ['test', Buffer.allocUnsafe(104857600)],
|
||||
})
|
||||
console.log('bytea warmup done')
|
||||
const start = Date.now()
|
||||
const results = await client.query('SELECT * FROM buf')
|
||||
const time = Date.now() - start
|
||||
console.log('bytea time:', time, 'ms')
|
||||
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
|
||||
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
|
||||
|
||||
console.log()
|
||||
await client.end()
|
||||
await client.end()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user