mirror of
https://github.com/brianc/node-postgres.git
synced 2026-01-18 15:55:05 +00:00
commit
84af374ab7
@ -22,6 +22,13 @@ var Connection = function(config) {
|
||||
this.ssl = config.ssl || false;
|
||||
this._ending = false;
|
||||
this._mode = TEXT_MODE;
|
||||
this._emitMessage = false;
|
||||
var self = this;
|
||||
this.on('newListener', function(eventName) {
|
||||
if(eventName == 'message') {
|
||||
self._emitMessage = true;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
util.inherits(Connection, EventEmitter);
|
||||
@ -41,7 +48,7 @@ Connection.prototype.connect = function(port, host) {
|
||||
});
|
||||
|
||||
this.stream.on('error', function(error) {
|
||||
//don't raise ECONNRESET errors - they can & should be ignored
|
||||
//don't raise ECONNRESET errors - they can & should be ignored
|
||||
//during disconnect
|
||||
if(self._ending && error.code == 'ECONNRESET') {
|
||||
return;
|
||||
@ -80,16 +87,17 @@ Connection.prototype.connect = function(port, host) {
|
||||
};
|
||||
|
||||
Connection.prototype.attachListeners = function(stream) {
|
||||
var self = this;
|
||||
stream.on('data', function(buffer) {
|
||||
self.setBuffer(buffer);
|
||||
var msg = self.parseMessage();
|
||||
stream.on('data', function(buff) {
|
||||
this.setBuffer(buff);
|
||||
var msg = this.parseMessage();
|
||||
while(msg) {
|
||||
self.emit('message', msg);
|
||||
self.emit(msg.name, msg);
|
||||
msg = self.parseMessage();
|
||||
if(this._emitMessage) {
|
||||
this.emit('message', msg);
|
||||
}
|
||||
this.emit(msg.name, msg);
|
||||
msg = this.parseMessage();
|
||||
}
|
||||
});
|
||||
}.bind(this));
|
||||
};
|
||||
|
||||
Connection.prototype.requestSsl = function(config) {
|
||||
@ -293,10 +301,29 @@ Connection.prototype.setBuffer = function(buffer) {
|
||||
buffer.copy(combinedBuffer, remaining, 0);
|
||||
buffer = combinedBuffer;
|
||||
}
|
||||
this.lastBuffer = false;
|
||||
this.buffer = buffer;
|
||||
this.offset = 0;
|
||||
};
|
||||
|
||||
Connection.prototype.readSslResponse = function() {
|
||||
var remaining = this.buffer.length - (this.offset);
|
||||
if(remaining < 1) {
|
||||
this.lastBuffer = this.buffer;
|
||||
this.lastOffset = this.offset;
|
||||
return false;
|
||||
}
|
||||
return {
|
||||
name: 'sslresponse',
|
||||
text: this.buffer[this.offset++]
|
||||
};
|
||||
};
|
||||
|
||||
var Message = function(name, length) {
|
||||
this.name = name;
|
||||
this.length = length;
|
||||
};
|
||||
|
||||
Connection.prototype.parseMessage = function() {
|
||||
var remaining = this.buffer.length - (this.offset);
|
||||
if(remaining < 5) {
|
||||
@ -309,8 +336,9 @@ Connection.prototype.parseMessage = function() {
|
||||
|
||||
//read message id code
|
||||
var id = this.buffer[this.offset++];
|
||||
var buffer = this.buffer;
|
||||
//read message length
|
||||
var length = this.parseInt32();
|
||||
var length = this.parseInt32(buffer);
|
||||
|
||||
if(remaining <= length) {
|
||||
this.lastBuffer = this.buffer;
|
||||
@ -319,103 +347,81 @@ Connection.prototype.parseMessage = function() {
|
||||
return false;
|
||||
}
|
||||
|
||||
var msg = {
|
||||
length: length
|
||||
};
|
||||
switch(id)
|
||||
{
|
||||
|
||||
case 0x52: //R
|
||||
msg.name = 'authenticationOk';
|
||||
return this.parseR(msg);
|
||||
return this.parseR(buffer, length);
|
||||
|
||||
case 0x53: //S
|
||||
msg.name = 'parameterStatus';
|
||||
return this.parseS(msg);
|
||||
return this.parseS(buffer, length);
|
||||
|
||||
case 0x4b: //K
|
||||
msg.name = 'backendKeyData';
|
||||
return this.parseK(msg);
|
||||
return this.parseK(buffer, length);
|
||||
|
||||
case 0x43: //C
|
||||
msg.name = 'commandComplete';
|
||||
return this.parseC(msg);
|
||||
return this.parseC(buffer, length);
|
||||
|
||||
case 0x5a: //Z
|
||||
msg.name = 'readyForQuery';
|
||||
return this.parseZ(msg);
|
||||
return this.parseZ(buffer, length);
|
||||
|
||||
case 0x54: //T
|
||||
msg.name = 'rowDescription';
|
||||
return this.parseT(msg);
|
||||
return this.parseT(buffer, length);
|
||||
|
||||
case 0x44: //D
|
||||
msg.name = 'dataRow';
|
||||
return this.parseD(msg);
|
||||
return this.parseD(buffer, length);
|
||||
|
||||
case 0x45: //E
|
||||
msg.name = 'error';
|
||||
return this.parseE(msg);
|
||||
return this.parseE(buffer, length);
|
||||
|
||||
case 0x4e: //N
|
||||
msg.name = 'notice';
|
||||
return this.parseN(msg);
|
||||
return this.parseN(buffer, length);
|
||||
|
||||
case 0x31: //1
|
||||
msg.name = 'parseComplete';
|
||||
return msg;
|
||||
return new Message('parseComplete', length);
|
||||
|
||||
case 0x32: //2
|
||||
msg.name = 'bindComplete';
|
||||
return msg;
|
||||
return new Message('bindComplete', length);
|
||||
|
||||
case 0x41: //A
|
||||
msg.name = 'notification';
|
||||
return this.parseA(msg);
|
||||
return this.parseA(buffer, length);
|
||||
|
||||
case 0x6e: //n
|
||||
msg.name = 'noData';
|
||||
return msg;
|
||||
return new Message('noData', length);
|
||||
|
||||
case 0x49: //I
|
||||
msg.name = 'emptyQuery';
|
||||
return msg;
|
||||
return new Message('emptyQuery', length);
|
||||
|
||||
case 0x73: //s
|
||||
msg.name = 'portalSuspended';
|
||||
return msg;
|
||||
return new Message('portalSuspended', length);
|
||||
|
||||
case 0x47: //G
|
||||
msg.name = 'copyInResponse';
|
||||
return this.parseGH(msg);
|
||||
return this.parseG(buffer, length);
|
||||
|
||||
case 0x48: //H
|
||||
msg.name = 'copyOutResponse';
|
||||
return this.parseGH(msg);
|
||||
return this.parseH(buffer, length);
|
||||
|
||||
case 0x63: //c
|
||||
msg.name = 'copyDone';
|
||||
return msg;
|
||||
return new Message('copyDone', length);
|
||||
|
||||
case 0x64: //d
|
||||
msg.name = 'copyData';
|
||||
return this.parsed(msg);
|
||||
|
||||
default:
|
||||
throw new Error("Unrecognized message code " + id);
|
||||
return this.parsed(buffer, length);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
Connection.prototype.parseR = function(msg) {
|
||||
Connection.prototype.parseR = function(buffer, length) {
|
||||
var code = 0;
|
||||
var msg = new Message('authenticationOk', length);
|
||||
if(msg.length === 8) {
|
||||
code = this.parseInt32();
|
||||
code = this.parseInt32(buffer);
|
||||
if(code === 3) {
|
||||
msg.name = 'authenticationCleartextPassword';
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
if(msg.length === 12) {
|
||||
code = this.parseInt32();
|
||||
code = this.parseInt32(buffer);
|
||||
if(code === 5) { //md5 required
|
||||
msg.name = 'authenticationMD5Password';
|
||||
msg.salt = new Buffer(4);
|
||||
@ -427,86 +433,113 @@ Connection.prototype.parseR = function(msg) {
|
||||
throw new Error("Unknown authenticatinOk message type" + util.inspect(msg));
|
||||
};
|
||||
|
||||
Connection.prototype.parseS = function(msg) {
|
||||
msg.parameterName = this.parseCString();
|
||||
msg.parameterValue = this.parseCString();
|
||||
Connection.prototype.parseS = function(buffer, length) {
|
||||
var msg = new Message('parameterStatus', length);
|
||||
msg.parameterName = this.parseCString(buffer);
|
||||
msg.parameterValue = this.parseCString(buffer);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseK = function(msg) {
|
||||
msg.processID = this.parseInt32();
|
||||
msg.secretKey = this.parseInt32();
|
||||
Connection.prototype.parseK = function(buffer, length) {
|
||||
var msg = new Message('backendKeyData', length);
|
||||
msg.processID = this.parseInt32(buffer);
|
||||
msg.secretKey = this.parseInt32(buffer);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseC = function(msg) {
|
||||
msg.text = this.parseCString();
|
||||
Connection.prototype.parseC = function(buffer, length) {
|
||||
var msg = new Message('commandComplete', length);
|
||||
msg.text = this.parseCString(buffer);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseZ = function(msg) {
|
||||
msg.status = this.readChar();
|
||||
Connection.prototype.parseZ = function(buffer, length) {
|
||||
var msg = new Message('readyForQuery', length);
|
||||
msg.name = 'readyForQuery';
|
||||
msg.status = this.readString(buffer, 1);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseT = function(msg) {
|
||||
msg.fieldCount = this.parseInt16();
|
||||
ROW_DESCRIPTION = 'rowDescription';
|
||||
Connection.prototype.parseT = function(buffer, length) {
|
||||
var msg = new Message(ROW_DESCRIPTION, length);
|
||||
msg.fieldCount = this.parseInt16(buffer);
|
||||
var fields = [];
|
||||
for(var i = 0; i < msg.fieldCount; i++){
|
||||
fields.push(this.parseField());
|
||||
fields.push(this.parseField(buffer));
|
||||
}
|
||||
msg.fields = fields;
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseField = function() {
|
||||
var field = {
|
||||
name: this.parseCString(),
|
||||
tableID: this.parseInt32(),
|
||||
columnID: this.parseInt16(),
|
||||
dataTypeID: this.parseInt32(),
|
||||
dataTypeSize: this.parseInt16(),
|
||||
dataTypeModifier: this.parseInt32(),
|
||||
format: undefined
|
||||
};
|
||||
if(this.parseInt16() === TEXT_MODE) {
|
||||
var Field = function() {
|
||||
this.name = null;
|
||||
this.tableID = null;
|
||||
this.columnID = null;
|
||||
this.dataTypeID = null;
|
||||
this.dataTypeSize = null;
|
||||
this.dataTypeModifier = null;
|
||||
this.format = null;
|
||||
};
|
||||
|
||||
FORMAT_TEXT = 'text';
|
||||
FORMAT_BINARY = 'binary';
|
||||
Connection.prototype.parseField = function(buffer) {
|
||||
var field = new Field();
|
||||
field.name = this.parseCString(buffer);
|
||||
field.tableID = this.parseInt32(buffer);
|
||||
field.columnID = this.parseInt16(buffer);
|
||||
field.dataTypeID = this.parseInt32(buffer);
|
||||
field.dataTypeSize = this.parseInt16(buffer);
|
||||
field.dataTypeModifier = this.parseInt32(buffer);
|
||||
if(this.parseInt16(buffer) === TEXT_MODE) {
|
||||
this._mode = TEXT_MODE;
|
||||
field.format = 'text';
|
||||
field.format = FORMAT_TEXT;
|
||||
} else {
|
||||
this._mode = BINARY_MODE;
|
||||
field.format = 'binary';
|
||||
field.format = FORMAT_BINARY;
|
||||
}
|
||||
return field;
|
||||
};
|
||||
|
||||
Connection.prototype.parseD = function(msg) {
|
||||
var fieldCount = this.parseInt16();
|
||||
var fields = [];
|
||||
DATA_ROW = 'dataRow';
|
||||
var DataRowMessage = function(name, length, fieldCount) {
|
||||
this.name = DATA_ROW;
|
||||
this.length = length;
|
||||
this.fieldCount = fieldCount;
|
||||
this.fields = [];
|
||||
};
|
||||
|
||||
|
||||
//extremely hot-path code
|
||||
Connection.prototype.parseD = function(buffer, length) {
|
||||
var fieldCount = this.parseInt16(buffer);
|
||||
var msg = new DataRowMessage(length, fieldCount);
|
||||
for(var i = 0; i < fieldCount; i++) {
|
||||
var length = this.parseInt32();
|
||||
var value = null;
|
||||
if(length !== -1) {
|
||||
if(this._mode === TEXT_MODE) {
|
||||
value = this.readString(length);
|
||||
} else {
|
||||
value = this.readBytes(length);
|
||||
}
|
||||
}
|
||||
fields.push(value);
|
||||
msg.fields.push(this._readValue(buffer));
|
||||
}
|
||||
msg.fieldCount = fieldCount;
|
||||
msg.fields = fields;
|
||||
return msg;
|
||||
};
|
||||
|
||||
//extremely hot-path code
|
||||
Connection.prototype._readValue = function(buffer) {
|
||||
var length = this.parseInt32(buffer);
|
||||
if(length === -1) return null;
|
||||
if(this._mode === TEXT_MODE) {
|
||||
return this.readString(buffer, length);
|
||||
}
|
||||
return this.readBytes(buffer, length);
|
||||
};
|
||||
|
||||
//parses error
|
||||
Connection.prototype.parseE = function(input) {
|
||||
Connection.prototype.parseE = function(buffer, length) {
|
||||
var fields = {};
|
||||
var msg, item;
|
||||
var fieldType = this.readString(1);
|
||||
var input = new Message('error', length);
|
||||
var fieldType = this.readString(buffer, 1);
|
||||
while(fieldType != '\0') {
|
||||
fields[fieldType] = this.parseCString();
|
||||
fieldType = this.readString(1);
|
||||
fields[fieldType] = this.parseCString(buffer);
|
||||
fieldType = this.readString(buffer, 1);
|
||||
}
|
||||
if(input.name === 'error') {
|
||||
// the msg is an Error instance
|
||||
@ -537,61 +570,72 @@ Connection.prototype.parseE = function(input) {
|
||||
};
|
||||
|
||||
//same thing, different name
|
||||
Connection.prototype.parseN = Connection.prototype.parseE;
|
||||
|
||||
Connection.prototype.parseA = function(msg) {
|
||||
msg.processId = this.parseInt32();
|
||||
msg.channel = this.parseCString();
|
||||
msg.payload = this.parseCString();
|
||||
Connection.prototype.parseN = function(buffer, length) {
|
||||
var msg = this.parseE(buffer, length);
|
||||
msg.name = 'notice';
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseGH = function (msg) {
|
||||
Connection.prototype.parseA = function(buffer, length) {
|
||||
var msg = new Message('notification', length);
|
||||
msg.processId = this.parseInt32(buffer);
|
||||
msg.channel = this.parseCString(buffer);
|
||||
msg.payload = this.parseCString(buffer);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseG = function (buffer, length) {
|
||||
var msg = new Message('copyInResponse', length);
|
||||
return this.parseGH(buffer, msg);
|
||||
};
|
||||
|
||||
Connection.prototype.parseH = function(buffer, length) {
|
||||
var msg = new Message('copyOutResponse', length);
|
||||
return this.parseGH(buffer, msg);
|
||||
};
|
||||
|
||||
Connection.prototype.parseGH = function (buffer, msg) {
|
||||
var isBinary = this.buffer[this.offset] !== 0;
|
||||
this.offset++;
|
||||
msg.binary = isBinary;
|
||||
var columnCount = this.parseInt16();
|
||||
var columnCount = this.parseInt16(buffer);
|
||||
msg.columnTypes = [];
|
||||
for(var i = 0; i<columnCount; i++) {
|
||||
msg.columnTypes.push(this.parseInt16());
|
||||
msg.columnTypes.push(this.parseInt16(buffer));
|
||||
}
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.readChar = function() {
|
||||
return this.readString(1);
|
||||
Connection.prototype.parsed = function (buffer, length) {
|
||||
var msg = new Message('copyData', length);
|
||||
msg.chunk = this.readBytes(buffer, msg.length - 4);
|
||||
return msg;
|
||||
};
|
||||
|
||||
Connection.prototype.parseInt32 = function() {
|
||||
var value = this.buffer.readInt32BE(this.offset, true);
|
||||
Connection.prototype.parseInt32 = function(buffer) {
|
||||
var value = buffer.readInt32BE(this.offset, true);
|
||||
this.offset += 4;
|
||||
return value;
|
||||
};
|
||||
|
||||
Connection.prototype.parseInt16 = function() {
|
||||
var value = this.buffer.readInt16BE(this.offset, true);
|
||||
Connection.prototype.parseInt16 = function(buffer) {
|
||||
var value = buffer.readInt16BE(this.offset, true);
|
||||
this.offset += 2;
|
||||
return value;
|
||||
};
|
||||
|
||||
Connection.prototype.readString = function(length) {
|
||||
return this.buffer.toString(this.encoding, this.offset, (this.offset += length));
|
||||
Connection.prototype.readString = function(buffer, length) {
|
||||
return buffer.toString(this.encoding, this.offset, (this.offset += length));
|
||||
};
|
||||
|
||||
Connection.prototype.readBytes = function(length) {
|
||||
return this.buffer.slice(this.offset, this.offset += length);
|
||||
Connection.prototype.readBytes = function(buffer, length) {
|
||||
return buffer.slice(this.offset, this.offset += length);
|
||||
};
|
||||
|
||||
Connection.prototype.parseCString = function() {
|
||||
Connection.prototype.parseCString = function(buffer) {
|
||||
var start = this.offset;
|
||||
while(this.buffer[this.offset++] !== 0) { }
|
||||
return this.buffer.toString(this.encoding, start, this.offset - 1);
|
||||
};
|
||||
|
||||
Connection.prototype.parsed = function (msg) {
|
||||
//exclude length field
|
||||
msg.chunk = this.readBytes(msg.length - 4);
|
||||
return msg;
|
||||
while(buffer[this.offset++] !== 0) { }
|
||||
return buffer.toString(this.encoding, start, this.offset - 1);
|
||||
};
|
||||
//end parsing methods
|
||||
module.exports = Connection;
|
||||
|
||||
@ -10,6 +10,7 @@ var Result = function(rowMode) {
|
||||
this.rows = [];
|
||||
this.fields = [];
|
||||
this._parsers = [];
|
||||
this.RowCtor = null;
|
||||
if(rowMode == "array") {
|
||||
this.parseRow = this._parseRowAsArray;
|
||||
}
|
||||
@ -56,25 +57,18 @@ Result.prototype._parseRowAsArray = function(rowData) {
|
||||
//rowData is an array of text or binary values
|
||||
//this turns the row into a JavaScript object
|
||||
Result.prototype.parseRow = function(rowData) {
|
||||
var row = {};
|
||||
for(var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i];
|
||||
var field = this.fields[i];
|
||||
var fieldType = field.dataTypeID;
|
||||
var parsedValue = null;
|
||||
if(rawValue !== null) {
|
||||
parsedValue = this._parsers[i](rawValue);
|
||||
}
|
||||
var fieldName = field.name;
|
||||
row[fieldName] = parsedValue;
|
||||
}
|
||||
return row;
|
||||
return new this.RowCtor(this._parsers, rowData);
|
||||
};
|
||||
|
||||
Result.prototype.addRow = function(row) {
|
||||
this.rows.push(row);
|
||||
};
|
||||
|
||||
var inlineParser = function(fieldName, i) {
|
||||
return "\nthis['" + fieldName + "'] = " +
|
||||
"rowData[" + i + "] == null ? null : parsers[" + i + "](rowData[" + i + "]);";
|
||||
};
|
||||
|
||||
Result.prototype.addFields = function(fieldDescriptions) {
|
||||
//clears field definitions
|
||||
//multiple query statements in 1 action can result in multiple sets
|
||||
@ -84,11 +78,17 @@ Result.prototype.addFields = function(fieldDescriptions) {
|
||||
this.fields = [];
|
||||
this._parsers = [];
|
||||
}
|
||||
var ctorBody = "";
|
||||
for(var i = 0; i < fieldDescriptions.length; i++) {
|
||||
var desc = fieldDescriptions[i];
|
||||
this.fields.push(desc);
|
||||
this._parsers.push(types.getTypeParser(desc.dataTypeID, desc.format || 'text'));
|
||||
var parser = types.getTypeParser(desc.dataTypeID, desc.format || 'text');
|
||||
this._parsers.push(parser);
|
||||
//this is some craziness to compile the row result parsing
|
||||
//results in ~60% speedup on large query result sets
|
||||
ctorBody += inlineParser(desc.name, i);
|
||||
}
|
||||
this.RowCtor = Function("parsers", "rowData", ctorBody);
|
||||
};
|
||||
|
||||
module.exports = Result;
|
||||
|
||||
@ -17,7 +17,6 @@ var getTypeParser = function(oid, format) {
|
||||
if (!typeParsers[format]) {
|
||||
return noParse;
|
||||
}
|
||||
|
||||
return typeParsers[format][oid] || noParse;
|
||||
};
|
||||
|
||||
@ -30,9 +29,7 @@ var setTypeParser = function(oid, format, parseFn) {
|
||||
};
|
||||
|
||||
textParsers.init(function(oid, converter) {
|
||||
typeParsers.text[oid] = function(value) {
|
||||
return converter(String(value));
|
||||
};
|
||||
typeParsers.text[oid] = converter;
|
||||
});
|
||||
|
||||
binaryParsers.init(function(oid, converter) {
|
||||
|
||||
@ -175,7 +175,6 @@ var init = function(register) {
|
||||
register(26, parseInteger); // oid
|
||||
register(700, parseFloat); // float4/real
|
||||
register(701, parseFloat); // float8/double
|
||||
//register(1700, parseString); // numeric/decimal
|
||||
register(16, parseBool);
|
||||
register(1082, parseDate); // date
|
||||
register(1114, parseDate); // timestamp without timezone
|
||||
|
||||
5
script/setup-bench-data.js
Normal file
5
script/setup-bench-data.js
Normal file
@ -0,0 +1,5 @@
|
||||
var pg = require('../lib');
|
||||
var
|
||||
pg.connect(function(err, client) {
|
||||
|
||||
})
|
||||
@ -1,4 +1,5 @@
|
||||
require(__dirname+'/test-helper');
|
||||
return false;
|
||||
var Connection = require(__dirname + '/../../../lib/connection');
|
||||
var buffers = require(__dirname + '/../../test-buffers');
|
||||
var PARSE = function(buffer) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user