More sync messages

This commit is contained in:
Brian M. Carlson 2019-12-19 07:48:51 -06:00
parent 577e6443ec
commit 143ed2901d
2 changed files with 94 additions and 44 deletions

43
packages/pg/bench.js Normal file
View File

@ -0,0 +1,43 @@
const pg = require("./lib");
const pool = new pg.Pool()
const q = {
text:
"select typname, typnamespace, typowner, typlen, typbyval, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray from pg_type where typtypmod = $1 and typisdefined = $2",
values: [-1, true]
};
const exec = async client => {
const result = await client.query({
text: q.text,
values: q.values,
rowMode: "array"
});
};
const bench = async (client, time) => {
let start = Date.now();
let count = 0;
while (true) {
await exec(client);
count++;
if (Date.now() - start > time) {
return count;
}
}
};
const run = async () => {
const client = new pg.Client();
await client.connect();
await bench(client, 1000);
console.log("warmup done");
const seconds = 5;
const queries = await bench(client, seconds * 1000);
console.log("queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 713 qps")
await client.end();
};
run().catch(e => console.error(e) || process.exit(-1));

View File

@ -16,10 +16,12 @@ var Reader = require('packet-reader')
var TEXT_MODE = 0
var BINARY_MODE = 1
var Connection = function (config) {
EventEmitter.call(this)
config = config || {}
this.stream = config.stream || new net.Socket()
this.stream.setNoDelay(true)
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
@ -87,7 +89,8 @@ Connection.prototype.connect = function (port, host) {
return self.emit('error', new Error('The server does not support SSL connections'))
case 'S': // Server supports SSL connections, continue with a secure connection
break
default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
@ -136,8 +139,9 @@ Connection.prototype.attachListeners = function (stream) {
Connection.prototype.requestSsl = function () {
var bodyBuffer = this.writer
.addInt16(0x04D2)
.addInt16(0x162F).flush()
.addInt16(0x04d2)
.addInt16(0x162f)
.flush()
var length = bodyBuffer.length + 4
@ -149,9 +153,7 @@ Connection.prototype.requestSsl = function () {
}
Connection.prototype.startup = function (config) {
var writer = this.writer
.addInt16(3)
.addInt16(0)
var writer = this.writer.addInt16(3).addInt16(0)
Object.keys(config).forEach(function (key) {
var val = config[key]
@ -206,8 +208,7 @@ Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initi
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
// 0x70 = 'p'
this.writer
.addString(additionalData)
this.writer.addString(additionalData)
this._send(0x70)
}
@ -216,11 +217,7 @@ Connection.prototype._send = function (code, more) {
if (!this.stream.writable) {
return false
}
if (more === true) {
this.writer.addHeader(code)
} else {
return this.stream.write(this.writer.flush(code))
}
return this.stream.write(this.writer.flush(code))
}
Connection.prototype.query = function (text) {
@ -229,8 +226,7 @@ Connection.prototype.query = function (text) {
}
// send parse message
// "more" === true to buffer the message until flush() is called
Connection.prototype.parse = function (query, more) {
Connection.prototype.parse = function (query) {
// expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
@ -257,12 +253,13 @@ Connection.prototype.parse = function (query, more) {
}
var code = 0x50
this._send(code, more)
this._send(code)
this.flush()
}
// send bind message
// "more" === true to buffer the message until flush() is called
Connection.prototype.bind = function (config, more) {
Connection.prototype.bind = function (config) {
// normalize config
config = config || {}
config.portal = config.portal || ''
@ -271,13 +268,17 @@ Connection.prototype.bind = function (config, more) {
var values = config.values || []
var len = values.length
var useBinary = false
for (var j = 0; j < len; j++) { useBinary |= values[j] instanceof Buffer }
var buffer = this.writer
.addCString(config.portal)
.addCString(config.statement)
if (!useBinary) { buffer.addInt16(0) } else {
for (var j = 0; j < len; j++) {
useBinary |= values[j] instanceof Buffer
}
var buffer = this.writer.addCString(config.portal).addCString(config.statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) { buffer.addInt16(values[j] instanceof Buffer) }
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
@ -300,59 +301,63 @@ Connection.prototype.bind = function (config, more) {
buffer.addInt16(0) // format codes to use text
}
// 0x42 = 'B'
this._send(0x42, more)
this._send(0x42)
this.flush()
}
// send execute message
// "more" === true to buffer the message until flush() is called
Connection.prototype.execute = function (config, more) {
Connection.prototype.execute = function (config) {
config = config || {}
config.portal = config.portal || ''
config.rows = config.rows || ''
this.writer
.addCString(config.portal)
.addInt32(config.rows)
this.writer.addCString(config.portal).addInt32(config.rows)
// 0x45 = 'E'
this._send(0x45, more)
this._send(0x45)
this.flush()
}
var emptyBuffer = Buffer.alloc(0)
const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.flush = function () {
// 0x48 = 'H'
this.writer.add(emptyBuffer)
this._send(0x48)
if (this.stream.writable) {
this.stream.write(flushBuffer)
}
}
const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.sync = function () {
// clear out any pending data in the writer
this.writer.flush(0)
this.writer.add(emptyBuffer)
this._ending = true
this._send(0x53)
// clear out any pending data in the writer
this.writer.clear()
if (this.stream.writable) {
this.stream.write(syncBuffer)
this.stream.write(flushBuffer)
}
}
const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04])
Connection.prototype.end = function () {
// 0x58 = 'X'
this.writer.add(emptyBuffer)
this.writer.clear()
this._ending = true
return this.stream.write(END_BUFFER, () => {
this.stream.end()
})
}
Connection.prototype.close = function (msg, more) {
Connection.prototype.close = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x43, more)
this._send(0x43)
}
Connection.prototype.describe = function (msg, more) {
Connection.prototype.describe = function (msg) {
this.writer.addCString(msg.type + (msg.name || ''))
this._send(0x44, more)
this._send(0x44)
this.flush()
}
Connection.prototype.sendCopyFromChunk = function (chunk) {
@ -376,8 +381,9 @@ var Message = function (name, length) {
Connection.prototype.parseMessage = function (buffer) {
this.offset = 0
var length = buffer.length + 4
switch (this._reader.header) {
const length = buffer.length + 4;
const code = this._reader.header;
switch (code) {
case 0x52: // R
return this.parseR(buffer, length)
@ -441,6 +447,7 @@ Connection.prototype.parseMessage = function (buffer) {
case 0x64: // d
return this.parsed(buffer, length)
}
console.log('could not parse', packet)
}
Connection.prototype.parseR = function (buffer, length) {