diff --git a/packages/pg/bench.js b/packages/pg/bench.js new file mode 100644 index 00000000..7a7084ae --- /dev/null +++ b/packages/pg/bench.js @@ -0,0 +1,43 @@ +const pg = require("./lib"); +const pool = new pg.Pool() + +const q = { + text: + "select typname, typnamespace, typowner, typlen, typbyval, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray from pg_type where typtypmod = $1 and typisdefined = $2", + values: [-1, true] +}; + +const exec = async client => { + const result = await client.query({ + text: q.text, + values: q.values, + rowMode: "array" + }); +}; + +const bench = async (client, time) => { + let start = Date.now(); + let count = 0; + while (true) { + await exec(client); + count++; + if (Date.now() - start > time) { + return count; + } + } +}; + +const run = async () => { + const client = new pg.Client(); + await client.connect(); + await bench(client, 1000); + console.log("warmup done"); + const seconds = 5; + const queries = await bench(client, seconds * 1000); + console.log("queries:", queries); + console.log("qps", queries / seconds); + console.log("on my laptop best so far seen 713 qps") + await client.end(); +}; + +run().catch(e => console.error(e) || process.exit(-1)); diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 4fae9208..c6035e50 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -16,10 +16,12 @@ var Reader = require('packet-reader') var TEXT_MODE = 0 var BINARY_MODE = 1 + var Connection = function (config) { EventEmitter.call(this) config = config || {} this.stream = config.stream || new net.Socket() + this.stream.setNoDelay(true) this._keepAlive = config.keepAlive this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false @@ -87,7 +89,8 @@ Connection.prototype.connect = function (port, host) { return self.emit('error', new Error('The server does not support SSL connections')) case 'S': // Server supports SSL connections, continue with a secure connection break - default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error + default: + // Any other response byte, including 'E' (ErrorResponse) indicating a server error return self.emit('error', new Error('There was an error establishing an SSL connection')) } var tls = require('tls') @@ -136,8 +139,9 @@ Connection.prototype.attachListeners = function (stream) { Connection.prototype.requestSsl = function () { var bodyBuffer = this.writer - .addInt16(0x04D2) - .addInt16(0x162F).flush() + .addInt16(0x04d2) + .addInt16(0x162f) + .flush() var length = bodyBuffer.length + 4 @@ -149,9 +153,7 @@ Connection.prototype.requestSsl = function () { } Connection.prototype.startup = function (config) { - var writer = this.writer - .addInt16(3) - .addInt16(0) + var writer = this.writer.addInt16(3).addInt16(0) Object.keys(config).forEach(function (key) { var val = config[key] @@ -206,8 +208,7 @@ Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initi Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { // 0x70 = 'p' - this.writer - .addString(additionalData) + this.writer.addString(additionalData) this._send(0x70) } @@ -216,11 +217,7 @@ Connection.prototype._send = function (code, more) { if (!this.stream.writable) { return false } - if (more === true) { - this.writer.addHeader(code) - } else { - return this.stream.write(this.writer.flush(code)) - } + return this.stream.write(this.writer.flush(code)) } Connection.prototype.query = function (text) { @@ -229,8 +226,7 @@ Connection.prototype.query = function (text) { } // send parse message -// "more" === true to buffer the message until flush() is called -Connection.prototype.parse = function (query, more) { +Connection.prototype.parse = function (query) { // expect something like this: // { name: 'queryName', // text: 'select * from blah', @@ -257,12 +253,13 @@ Connection.prototype.parse = function (query, more) { } var code = 0x50 - this._send(code, more) + this._send(code) + this.flush() } // send bind message // "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config, more) { +Connection.prototype.bind = function (config) { // normalize config config = config || {} config.portal = config.portal || '' @@ -271,13 +268,17 @@ Connection.prototype.bind = function (config, more) { var values = config.values || [] var len = values.length var useBinary = false - for (var j = 0; j < len; j++) { useBinary |= values[j] instanceof Buffer } - var buffer = this.writer - .addCString(config.portal) - .addCString(config.statement) - if (!useBinary) { buffer.addInt16(0) } else { + for (var j = 0; j < len; j++) { + useBinary |= values[j] instanceof Buffer + } + var buffer = this.writer.addCString(config.portal).addCString(config.statement) + if (!useBinary) { + buffer.addInt16(0) + } else { buffer.addInt16(len) - for (j = 0; j < len; j++) { buffer.addInt16(values[j] instanceof Buffer) } + for (j = 0; j < len; j++) { + buffer.addInt16(values[j] instanceof Buffer) + } } buffer.addInt16(len) for (var i = 0; i < len; i++) { @@ -300,59 +301,63 @@ Connection.prototype.bind = function (config, more) { buffer.addInt16(0) // format codes to use text } // 0x42 = 'B' - this._send(0x42, more) + this._send(0x42) + this.flush() } // send execute message // "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config, more) { +Connection.prototype.execute = function (config) { config = config || {} config.portal = config.portal || '' config.rows = config.rows || '' - this.writer - .addCString(config.portal) - .addInt32(config.rows) + this.writer.addCString(config.portal).addInt32(config.rows) // 0x45 = 'E' - this._send(0x45, more) + this._send(0x45) + this.flush() } var emptyBuffer = Buffer.alloc(0) +const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04]) Connection.prototype.flush = function () { - // 0x48 = 'H' - this.writer.add(emptyBuffer) - this._send(0x48) + if (this.stream.writable) { + this.stream.write(flushBuffer) + } } +const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04]) Connection.prototype.sync = function () { - // clear out any pending data in the writer - this.writer.flush(0) - - this.writer.add(emptyBuffer) this._ending = true - this._send(0x53) + // clear out any pending data in the writer + this.writer.clear() + if (this.stream.writable) { + this.stream.write(syncBuffer) + this.stream.write(flushBuffer) + } } const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04]) Connection.prototype.end = function () { // 0x58 = 'X' - this.writer.add(emptyBuffer) + this.writer.clear() this._ending = true return this.stream.write(END_BUFFER, () => { this.stream.end() }) } -Connection.prototype.close = function (msg, more) { +Connection.prototype.close = function (msg) { this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x43, more) + this._send(0x43) } -Connection.prototype.describe = function (msg, more) { +Connection.prototype.describe = function (msg) { this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x44, more) + this._send(0x44) + this.flush() } Connection.prototype.sendCopyFromChunk = function (chunk) { @@ -376,8 +381,9 @@ var Message = function (name, length) { Connection.prototype.parseMessage = function (buffer) { this.offset = 0 - var length = buffer.length + 4 - switch (this._reader.header) { + const length = buffer.length + 4; + const code = this._reader.header; + switch (code) { case 0x52: // R return this.parseR(buffer, length) @@ -441,6 +447,7 @@ Connection.prototype.parseMessage = function (buffer) { case 0x64: // d return this.parsed(buffer, length) } + console.log('could not parse', packet) } Connection.prototype.parseR = function (buffer, length) {