diff --git a/lib/client.js b/lib/client.js index 672943fa..bd03425a 100644 --- a/lib/client.js +++ b/lib/client.js @@ -20,25 +20,23 @@ var Client = function(config) { this.host = this.connectionParameters.host; this.password = this.connectionParameters.password; - config = config || {}; + var c = config || {}; - this.connection = config.connection || new Connection({ - stream: config.stream, - ssl: config.ssl + this.connection = c.connection || new Connection({ + stream: c.stream, + ssl: c.ssl }); this.queryQueue = []; - this.binary = config.binary || defaults.binary; + this.binary = c.binary || defaults.binary; this.encoding = 'utf8'; this.processID = null; this.secretKey = null; - this.ssl = config.ssl || false; + this.ssl = c.ssl || false; }; util.inherits(Client, EventEmitter); -var p = Client.prototype; - -p.connect = function(callback) { +Client.prototype.connect = function(callback) { var self = this; var con = this.connection; if(this.host && this.host.indexOf('/') === 0) { @@ -50,7 +48,7 @@ p.connect = function(callback) { //once connection is established send startup message con.on('connect', function() { - if (self.ssl) { + if(self.ssl) { con.requestSsl(); } else { con.startup({ @@ -59,6 +57,7 @@ p.connect = function(callback) { }); } }); + con.on('sslconnect', function() { con.startup({ user: self.user, @@ -91,10 +90,12 @@ p.connect = function(callback) { con.on('rowDescription', function(msg) { self.activeQuery.handleRowDescription(msg); }); + //delegate datarow to active query con.on('dataRow', function(msg) { self.activeQuery.handleDataRow(msg); }); + //TODO should query gain access to connection? con.on('portalSuspended', function(msg) { self.activeQuery.getRows(con); @@ -108,11 +109,13 @@ p.connect = function(callback) { con.sync(); } }); + con.on('copyInResponse', function(msg) { self.activeQuery.streamData(self.connection); }); + con.on('copyOutResponse', function(msg) { - if (self.activeQuery.stream === undefined) { + if(self.activeQuery.stream === undefined) { self.activeQuery._canceledDueToError = new Error('No destination stream defined'); //canceling query requires creation of new connection @@ -121,9 +124,11 @@ p.connect = function(callback) { .cancel(self, self.activeQuery); } }); + con.on('copyData', function (msg) { self.activeQuery.handleCopyFromChunk(msg.chunk); }); + if (!callback) { self.emit('connect'); } else { @@ -170,8 +175,8 @@ p.connect = function(callback) { }; -p.cancel = function(client, query) { - if (client.activeQuery == query) { +Client.prototype.cancel = function(client, query) { + if(client.activeQuery == query) { var con = this.connection; if(this.host && this.host.indexOf('/') === 0) { @@ -184,13 +189,12 @@ p.cancel = function(client, query) { con.on('connect', function() { con.cancel(client.processID, client.secretKey); }); - } - else if (client.queryQueue.indexOf(query) != -1) { + } else if(client.queryQueue.indexOf(query) != -1) { client.queryQueue.splice(client.queryQueue.indexOf(query), 1); } }; -p._pulseQueryQueue = function() { +Client.prototype._pulseQueryQueue = function() { if(this.readyForQuery===true) { this.activeQuery = this.queryQueue.shift(); if(this.activeQuery) { @@ -199,40 +203,48 @@ p._pulseQueryQueue = function() { this.activeQuery.submit(this.connection); } else if(this.hasExecuted) { this.activeQuery = null; - if(this._drainPaused > 0) { this._drainPaused++; } - else { this.emit('drain'); } + //TODO remove pauseDrain for v1.0 + if(this._drainPaused > 0) { + this._drainPaused++; + } + else { + this.emit('drain'); + } } } }; -p._copy = function (text, stream) { - var config = {}, - query; + +Client.prototype._copy = function (text, stream) { + var config = {}; config.text = text; config.stream = stream; config.callback = function (error) { - if (error) { + if(error) { config.stream.error(error); } else { config.stream.close(); } }; - query = new Query(config); + var query = new Query(config); this.queryQueue.push(query); this._pulseQueryQueue(); return config.stream; }; -p.copyFrom = function (text) { + +Client.prototype.copyFrom = function (text) { return this._copy(text, new CopyFromStream()); }; -p.copyTo = function (text) { + +Client.prototype.copyTo = function (text) { return this._copy(text, new CopyToStream()); }; -p.query = function(config, values, callback) { + +Client.prototype.query = function(config, values, callback) { //can take in strings, config object or query object var query = (config instanceof Query) ? config : new Query(config, values, callback); - if (this.binary && !query.binary) { + if(this.binary && !query.binary) { query.binary = true; } @@ -243,19 +255,19 @@ p.query = function(config, values, callback) { //prevents client from otherwise emitting 'drain' event until 'resumeDrain' is //called -p.pauseDrain = function() { +Client.prototype.pauseDrain = function() { this._drainPaused = 1; }; //resume raising 'drain' event -p.resumeDrain = function() { +Client.prototype.resumeDrain = function() { if(this._drainPaused > 1) { this.emit('drain'); } this._drainPaused = 0; }; -p.end = function() { +Client.prototype.end = function() { this.connection.end(); }; diff --git a/lib/connection.js b/lib/connection.js index ffd7d9c5..c1a120df 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -22,13 +22,11 @@ var Connection = function(config) { util.inherits(Connection, EventEmitter); -var p = Connection.prototype; +Connection.prototype.connect = function(port, host) { -p.connect = function(port, host) { - - if (this.stream.readyState === 'closed') { + if(this.stream.readyState === 'closed') { this.stream.connect(port, host); - } else if (this.stream.readyState == 'open') { + } else if(this.stream.readyState == 'open') { this.emit('connect'); } @@ -50,7 +48,7 @@ p.connect = function(port, host) { self.emit(msg.name, msg); }); this.once('sslresponse', function(msg) { - if (msg.text == 0x53) { + if(msg.text == 0x53) { var tls = require('tls'); self.stream.removeAllListeners(); self.stream = tls.connect({ @@ -79,7 +77,7 @@ p.connect = function(port, host) { } }; -p.attachListeners = function(stream) { +Connection.prototype.attachListeners = function(stream) { var self = this; stream.on('data', function(buffer) { self.setBuffer(buffer); @@ -92,7 +90,7 @@ p.attachListeners = function(stream) { }); }; -p.requestSsl = function(config) { +Connection.prototype.requestSsl = function(config) { this.checkSslResponse = true; var bodyBuffer = this.writer @@ -108,7 +106,7 @@ p.requestSsl = function(config) { this.stream.write(buffer); }; -p.startup = function(config) { +Connection.prototype.startup = function(config) { var bodyBuffer = this.writer .addInt16(3) .addInt16(0) @@ -130,7 +128,7 @@ p.startup = function(config) { this.stream.write(buffer); }; -p.cancel = function(processID, secretKey) { +Connection.prototype.cancel = function(processID, secretKey) { var bodyBuffer = this.writer .addInt16(1234) .addInt16(5678) @@ -147,12 +145,12 @@ p.cancel = function(processID, secretKey) { this.stream.write(buffer); }; -p.password = function(password) { +Connection.prototype.password = function(password) { //0x70 = 'p' this._send(0x70, this.writer.addCString(password)); }; -p._send = function(code, more) { +Connection.prototype._send = function(code, more) { if(!this.stream.writable) { return false; } if(more === true) { this.writer.addHeader(code); @@ -161,14 +159,14 @@ p._send = function(code, more) { } }; -p.query = function(text) { +Connection.prototype.query = function(text) { //0x51 = Q this.stream.write(this.writer.addCString(text).flush(0x51)); }; //send parse message //"more" === true to buffer the message until flush() is called -p.parse = function(query, more) { +Connection.prototype.parse = function(query, more) { //expect something like this: // { name: 'queryName', // text: 'select * from blah', @@ -193,7 +191,7 @@ p.parse = function(query, more) { //send bind message //"more" === true to buffer the message until flush() is called -p.bind = function(config, more) { +Connection.prototype.bind = function(config, more) { //normalize config config = config || {}; config.portal = config.portal || ''; @@ -216,7 +214,7 @@ p.bind = function(config, more) { } } - if (config.binary) { + if(config.binary) { buffer.addInt16(1); // format codes to use binary buffer.addInt16(1); } @@ -229,7 +227,7 @@ p.bind = function(config, more) { //send execute message //"more" === true to buffer the message until flush() is called -p.execute = function(config, more) { +Connection.prototype.execute = function(config, more) { config = config || {}; config.portal = config.portal || ''; config.rows = config.rows || ''; @@ -243,13 +241,13 @@ p.execute = function(config, more) { var emptyBuffer = Buffer(0); -p.flush = function() { +Connection.prototype.flush = function() { //0x48 = 'H' this.writer.add(emptyBuffer); this._send(0x48); }; -p.sync = function() { +Connection.prototype.sync = function() { //clear out any pending data in the writer this.writer.flush(0); @@ -257,29 +255,33 @@ p.sync = function() { this._send(0x53); }; -p.end = function() { +Connection.prototype.end = function() { //0x58 = 'X' this.writer.add(emptyBuffer); this._send(0x58); }; -p.describe = function(msg, more) { +Connection.prototype.describe = function(msg, more) { this.writer.addCString(msg.type + (msg.name || '')); this._send(0x44, more); }; -p.sendCopyFromChunk = function (chunk) { + +Connection.prototype.sendCopyFromChunk = function (chunk) { this.stream.write(this.writer.add(chunk).flush(0x64)); }; -p.endCopyFrom = function () { + +Connection.prototype.endCopyFrom = function () { this.stream.write(this.writer.add(emptyBuffer).flush(0x63)); }; -p.sendCopyFail = function (msg) { + +Connection.prototype.sendCopyFail = function (msg) { //this.stream.write(this.writer.add(emptyBuffer).flush(0x66)); this.writer.addCString(msg); this._send(0x66); }; + //parsing methods -p.setBuffer = function(buffer) { +Connection.prototype.setBuffer = function(buffer) { if(this.lastBuffer) { //we have unfinished biznaz //need to combine last two buffers var remaining = this.lastBuffer.length - this.lastOffset; @@ -292,17 +294,20 @@ p.setBuffer = function(buffer) { this.offset = 0; }; -p.readSslResponse = function() { +Connection.prototype.readSslResponse = function() { var remaining = this.buffer.length - (this.offset); if(remaining < 1) { this.lastBuffer = this.buffer; this.lastOffset = this.offset; return false; } - return { name: 'sslresponse', text: this.buffer[this.offset++] }; + return { + name: 'sslresponse', + text: this.buffer[this.offset++] + }; }; -p.parseMessage = function() { +Connection.prototype.parseMessage = function() { var remaining = this.buffer.length - (this.offset); if(remaining < 5) { //cannot read id + length without at least 5 bytes @@ -410,7 +415,7 @@ p.parseMessage = function() { } }; -p.parseR = function(msg) { +Connection.prototype.parseR = function(msg) { var code = 0; if(msg.length === 8) { code = this.parseInt32(); @@ -432,29 +437,29 @@ p.parseR = function(msg) { throw new Error("Unknown authenticatinOk message type" + util.inspect(msg)); }; -p.parseS = function(msg) { +Connection.prototype.parseS = function(msg) { msg.parameterName = this.parseCString(); msg.parameterValue = this.parseCString(); return msg; }; -p.parseK = function(msg) { +Connection.prototype.parseK = function(msg) { msg.processID = this.parseInt32(); msg.secretKey = this.parseInt32(); return msg; }; -p.parseC = function(msg) { +Connection.prototype.parseC = function(msg) { msg.text = this.parseCString(); return msg; }; -p.parseZ = function(msg) { +Connection.prototype.parseZ = function(msg) { msg.status = this.readChar(); return msg; }; -p.parseT = function(msg) { +Connection.prototype.parseT = function(msg) { msg.fieldCount = this.parseInt16(); var fields = []; for(var i = 0; i < msg.fieldCount; i++){ @@ -464,7 +469,7 @@ p.parseT = function(msg) { return msg; }; -p.parseField = function() { +Connection.prototype.parseField = function() { var field = { name: this.parseCString(), tableID: this.parseInt32(), @@ -477,7 +482,7 @@ p.parseField = function() { return field; }; -p.parseD = function(msg) { +Connection.prototype.parseD = function(msg) { var fieldCount = this.parseInt16(); var fields = []; for(var i = 0; i < fieldCount; i++) { @@ -490,7 +495,7 @@ p.parseD = function(msg) { }; //parses error -p.parseE = function(input) { +Connection.prototype.parseE = function(input) { var fields = {}; var msg, item; var fieldType = this.readString(1); @@ -498,12 +503,12 @@ p.parseE = function(input) { fields[fieldType] = this.parseCString(); fieldType = this.readString(1); } - if (input.name === 'error') { + if(input.name === 'error') { // the msg is an Error instance msg = new Error(fields.M); for (item in input) { // copy input properties to the error - if (input.hasOwnProperty(item)) { + if(input.hasOwnProperty(item)) { msg[item] = input[item]; } } @@ -527,15 +532,16 @@ p.parseE = function(input) { }; //same thing, different name -p.parseN = p.parseE; +Connection.prototype.parseN = Connection.prototype.parseE; -p.parseA = function(msg) { +Connection.prototype.parseA = function(msg) { msg.processId = this.parseInt32(); msg.channel = this.parseCString(); msg.payload = this.parseCString(); return msg; }; -p.parseGH = function (msg) { + +Connection.prototype.parseGH = function (msg) { msg.binary = Boolean(this.parseInt8()); var columnCount = this.parseInt16(); msg.columnTypes = []; @@ -544,22 +550,24 @@ p.parseGH = function (msg) { } return msg; }; -p.parseInt8 = function () { + +Connection.prototype.parseInt8 = function () { var value = Number(this.buffer[this.offset]); this.offset++; return value; }; -p.readChar = function() { + +Connection.prototype.readChar = function() { return Buffer([this.buffer[this.offset++]]).toString(this.encoding); }; -p.parseInt32 = function() { +Connection.prototype.parseInt32 = function() { var value = this.peekInt32(); this.offset += 4; return value; }; -p.peekInt32 = function(offset) { +Connection.prototype.peekInt32 = function(offset) { offset = offset || this.offset; var buffer = this.buffer; return ((buffer[offset++] << 24) + @@ -569,26 +577,27 @@ p.peekInt32 = function(offset) { }; -p.parseInt16 = function() { +Connection.prototype.parseInt16 = function() { return ((this.buffer[this.offset++] << 8) + (this.buffer[this.offset++] << 0)); }; -p.readString = function(length) { +Connection.prototype.readString = function(length) { return this.buffer.toString(this.encoding, this.offset, (this.offset += length)); }; -p.readBytes = function(length) { +Connection.prototype.readBytes = function(length) { return this.buffer.slice(this.offset, this.offset += length); }; -p.parseCString = function() { +Connection.prototype.parseCString = function() { var start = this.offset; while(this.buffer[this.offset++]) { } return this.buffer.toString(this.encoding, start, this.offset - 1); }; -p.parsed = function (msg) { + +Connection.prototype.parsed = function (msg) { //exclude length field msg.chunk = this.readBytes(msg.length - 4); return msg; diff --git a/lib/copystream.js b/lib/copystream.js index 35f276d4..f82aadd6 100644 --- a/lib/copystream.js +++ b/lib/copystream.js @@ -11,28 +11,32 @@ var CopyFromStream = function () { this._dataBuffered = false; this.__defineGetter__("writable", this._writable.bind(this)); }; + util.inherits(CopyFromStream, Stream); + CopyFromStream.prototype._writable = function () { return !(this._finished || this._error); }; + CopyFromStream.prototype.startStreamingToConnection = function (connection) { - if (this._error) { + if(this._error) { return; } this._connection = connection; this._sendIfConnectionReady(); this._endIfNeedAndPossible(); }; + CopyFromStream.prototype._handleChunk = function (string, encoding) { var dataChunk, tmpBuffer; - if (string !== undefined) { - if (string instanceof Buffer) { + if(string !== undefined) { + if(string instanceof Buffer) { dataChunk = string; } else { dataChunk = new Buffer(string, encoding); } - if (this._buffer.length) { + if(this._buffer.length) { //Buffer.concat is better, but it's missing //in node v0.6.x tmpBuffer = new Buffer(this._buffer.length + dataChunk.length); @@ -46,12 +50,13 @@ CopyFromStream.prototype._handleChunk = function (string, encoding) { return this._sendIfConnectionReady(); }; + CopyFromStream.prototype._sendIfConnectionReady = function () { var dataSent = false; - if (this._connection) { + if(this._connection) { dataSent = this._connection.sendCopyFromChunk(this._buffer); this._buffer = new Buffer(0); - if (this._dataBuffered) { + if(this._dataBuffered) { this.emit('drain'); } this._dataBuffered = false; @@ -60,44 +65,50 @@ CopyFromStream.prototype._sendIfConnectionReady = function () { } return dataSent; }; + CopyFromStream.prototype._endIfNeedAndPossible = function () { - if (this._connection && this._finished && !this._finishedSent) { + if(this._connection && this._finished && !this._finishedSent) { this._finishedSent = true; this._connection.endCopyFrom(); } }; + CopyFromStream.prototype.write = function (string, encoding) { - if (this._error || this._finished) { + if(this._error || this._finished) { return false; } return this._handleChunk.apply(this, arguments); }; + CopyFromStream.prototype.end = function (string, encondig) { - if (this._error || this._finished) { + if(this._error || this._finished) { return false; } this._finished = true; - if (string !== undefined) { + if(string !== undefined) { this._handleChunk.apply(this, arguments); } this._endIfNeedAndPossible(); }; + CopyFromStream.prototype.error = function (error) { - if (this._error || this._closed) { + if(this._error || this._closed) { return false; } this._error = true; this.emit('error', error); }; + CopyFromStream.prototype.close = function () { - if (this._error || this._closed) { + if(this._error || this._closed) { return false; } - if (!this._finishedSent) { + if(!this._finishedSent) { throw new Error("seems to be error in code that uses CopyFromStream"); } this.emit("close"); }; + var CopyToStream = function () { Stream.apply(this, arguments); this._error = false; @@ -107,13 +118,15 @@ var CopyToStream = function () { this._encoding = undefined; this.__defineGetter__('readable', this._readable.bind(this)); }; + util.inherits(CopyToStream, Stream); + CopyToStream.prototype._outputDataChunk = function () { - if (this._paused) { + if(this._paused) { return; } - if (this.buffer.length) { - if (this._encoding) { + if(this.buffer.length) { + if(this._encoding) { this.emit('data', this.buffer.toString(this._encoding)); } else { this.emit('data', this.buffer); @@ -121,33 +134,37 @@ CopyToStream.prototype._outputDataChunk = function () { this.buffer = new Buffer(0); } }; + CopyToStream.prototype._readable = function () { return !this._finished && !this._error; }; + CopyToStream.prototype.error = function (error) { - if (!this.readable) { + if(!this.readable) { return false; } this._error = error; - if (!this._paused) { + if(!this._paused) { this.emit('error', error); } }; + CopyToStream.prototype.close = function () { - if (!this.readable) { + if(!this.readable) { return false; } this._finished = true; - if (!this._paused) { + if(!this._paused) { this.emit("end"); } }; + CopyToStream.prototype.handleChunk = function (chunk) { var tmpBuffer; - if (!this.readable) { + if(!this.readable) { return; } - if (!this.buffer.length) { + if(!this.buffer.length) { this.buffer = chunk; } else { tmpBuffer = new Buffer(this.buffer.length + chunk.length); @@ -157,25 +174,28 @@ CopyToStream.prototype.handleChunk = function (chunk) { } this._outputDataChunk(); }; + CopyToStream.prototype.pause = function () { - if (!this.readable) { + if(!this.readable) { return false; } this._paused = true; }; + CopyToStream.prototype.resume = function () { - if (!this._paused) { + if(!this._paused) { return false; } this._paused = false; this._outputDataChunk(); - if (this._error) { + if(this._error) { return this.emit('error', this._error); } - if (this._finished) { + if(this._finished) { return this.emit('end'); } }; + CopyToStream.prototype.setEncoding = function (encoding) { this._encoding = encoding; }; diff --git a/lib/index.js b/lib/index.js index 6dab1339..bb2041bf 100644 --- a/lib/index.js +++ b/lib/index.js @@ -3,7 +3,7 @@ var util = require('util'); var Client = require(__dirname+'/client'); var defaults = require(__dirname + '/defaults'); var pool = require(__dirname + '/pool'); -var types = require(__dirname + '/types'); +var types = require(__dirname + '/types/'); var Connection = require(__dirname + '/connection'); var PG = function(clientConstructor) { diff --git a/lib/native/index.js b/lib/native/index.js index 3a1c2f90..2918689e 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -2,12 +2,12 @@ var EventEmitter = require('events').EventEmitter; var ConnectionParameters = require(__dirname + '/../connection-parameters'); -var utils = require(__dirname + "/../utils"); var CopyFromStream = require(__dirname + '/../copystream').CopyFromStream; var CopyToStream = require(__dirname + '/../copystream').CopyToStream; var binding; +//TODO remove on v1.0.0 try { //v0.5.x binding = require(__dirname + '/../../build/Release/binding.node'); @@ -17,18 +17,15 @@ try { } var Connection = binding.Connection; -var types = require(__dirname + "/../types"); var NativeQuery = require(__dirname + '/query'); -var EventEmitter = require('events').EventEmitter; -var p = Connection.prototype; for(var k in EventEmitter.prototype) { - p[k] = EventEmitter.prototype[k]; + Connection.prototype[k] = EventEmitter.prototype[k]; } -var nativeConnect = p.connect; +var nativeConnect = Connection.prototype.connect; -p.connect = function(cb) { +Connection.prototype.connect = function(cb) { var self = this; this.connectionParameters.getLibpqConnectionString(function(err, conString) { if(err) { @@ -52,7 +49,8 @@ p.connect = function(cb) { nativeConnect.call(self, conString); }); }; -p._copy = function (text, stream) { + +Connection.prototype._copy = function (text, stream) { var q = new NativeQuery(text, function (error) { if (error) { q.stream.error(error); @@ -65,19 +63,24 @@ p._copy = function (text, stream) { this._pulseQueryQueue(); return q.stream; }; -p.copyFrom = function (text) { + +Connection.prototype.copyFrom = function (text) { return this._copy(text, new CopyFromStream()); }; -p.copyTo = function (text) { + +Connection.prototype.copyTo = function (text) { return this._copy(text, new CopyToStream()); }; -p.sendCopyFromChunk = function (chunk) { + +Connection.prototype.sendCopyFromChunk = function (chunk) { this._sendCopyFromChunk(chunk); }; -p.endCopyFrom = function (msg) { + +Connection.prototype.endCopyFrom = function (msg) { this._endCopyFrom(msg); }; -p.query = function(config, values, callback) { + +Connection.prototype.query = function(config, values, callback) { var query = (config instanceof NativeQuery) ? config : new NativeQuery(config, values, callback); this._queryQueue.push(query); @@ -85,16 +88,17 @@ p.query = function(config, values, callback) { return query; }; -var nativeCancel = p.cancel; +var nativeCancel = Connection.prototype.cancel; -p.cancel = function(client, query) { - if (client._activeQuery == query) +Connection.prototype.cancel = function(client, query) { + if (client._activeQuery == query) { this.connect(nativeCancel.bind(client)); - else if (client._queryQueue.indexOf(query) != -1) + } else if (client._queryQueue.indexOf(query) != -1) { client._queryQueue.splice(client._queryQueue.indexOf(query), 1); + } }; -p._pulseQueryQueue = function(initialConnection) { +Connection.prototype._pulseQueryQueue = function(initialConnection) { if(!this._connected) { return; } @@ -104,6 +108,7 @@ p._pulseQueryQueue = function(initialConnection) { var query = this._queryQueue.shift(); if(!query) { if(!initialConnection) { + //TODO remove all the pause-drain stuff for v1.0 if(this._drainPaused) { this._drainPaused++; } else { @@ -121,8 +126,7 @@ p._pulseQueryQueue = function(initialConnection) { this._namedQueries[query.name] = true; this._sendPrepare(query.name, query.text, (query.values||[]).length); } - } - else if(query.values) { + } else if(query.values) { //call native function this._sendQueryWithParams(query.text, query.values); } else { @@ -131,19 +135,23 @@ p._pulseQueryQueue = function(initialConnection) { } }; -p.pauseDrain = function() { +//TODO remove all the pause-drain stuff for v1.0 +Connection.prototype.pauseDrain = function() { this._drainPaused = 1; }; -p.resumeDrain = function() { +//TODO remove all the pause-drain stuff for v1.0 +Connection.prototype.resumeDrain = function() { if(this._drainPaused > 1) { this.emit('drain'); } this._drainPaused = 0; }; -p.sendCopyFail = function(msg) { + +Connection.prototype.sendCopyFail = function(msg) { this.endCopyFrom(msg); }; + var clientBuilder = function(config) { config = config || {}; var connection = new Connection(); @@ -209,10 +217,8 @@ var clientBuilder = function(config) { }); connection.on('copyOutResponse', function(msg) { if (connection._activeQuery.stream === undefined) { - connection._activeQuery._canceledDueToError = - new Error('No destination stream defined'); - (new clientBuilder({port: connection.port, host: connection.host})) - .cancel(connection, connection._activeQuery); + connection._activeQuery._canceledDueToError = new Error('No destination stream defined'); + (new clientBuilder({port: connection.port, host: connection.host})).cancel(connection, connection._activeQuery); } }); connection.on('copyData', function (chunk) { diff --git a/lib/native/query.js b/lib/native/query.js index 73dd14a9..4abbd5f4 100644 --- a/lib/native/query.js +++ b/lib/native/query.js @@ -1,7 +1,7 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); -var types = require(__dirname + '/../types'); +var types = require(__dirname + '/../types/'); var utils = require(__dirname + '/../utils'); var Result = require(__dirname + '/../result'); @@ -14,12 +14,12 @@ var NativeQuery = function(config, values, callback) { EventEmitter.call(this); - config = utils.normalizeQueryConfig(config, values, callback); + var c = utils.normalizeQueryConfig(config, values, callback); - this.name = config.name; - this.text = config.text; - this.values = config.values; - this.callback = config.callback; + this.name = c.name; + this.text = c.text; + this.values = c.values; + this.callback = c.callback; this._result = new Result(); //normalize values @@ -32,7 +32,6 @@ var NativeQuery = function(config, values, callback) { }; util.inherits(NativeQuery, EventEmitter); -var p = NativeQuery.prototype; //maps from native rowdata into api compatible row object var mapRowData = function(row) { @@ -45,7 +44,7 @@ var mapRowData = function(row) { return result; }; -p.handleRow = function(rowData) { +NativeQuery.prototype.handleRow = function(rowData) { var row = mapRowData(rowData); if(this.callback) { this._result.addRow(row); @@ -53,7 +52,7 @@ p.handleRow = function(rowData) { this.emit('row', row, this._result); }; -p.handleError = function(error) { +NativeQuery.prototype.handleError = function(error) { if (this._canceledDueToError) { error = this._canceledDueToError; this._canceledDueToError = false; @@ -66,7 +65,7 @@ p.handleError = function(error) { } }; -p.handleReadyForQuery = function(meta) { +NativeQuery.prototype.handleReadyForQuery = function(meta) { if (this._canceledDueToError) { return this.handleError(this._canceledDueToError); } @@ -78,16 +77,23 @@ p.handleReadyForQuery = function(meta) { } this.emit('end', this._result); }; -p.streamData = function (connection) { - if ( this.stream ) this.stream.startStreamingToConnection(connection); - else connection.sendCopyFail('No source stream defined'); + +NativeQuery.prototype.streamData = function (connection) { + if(this.stream) { + this.stream.startStreamingToConnection(connection); + } + else { + connection.sendCopyFail('No source stream defined'); + } }; -p.handleCopyFromChunk = function (chunk) { - if ( this.stream ) { + +NativeQuery.prototype.handleCopyFromChunk = function (chunk) { + if(this.stream) { this.stream.handleChunk(chunk); } //if there are no stream (for example when copy to query was sent by //query method instead of copyTo) error will be handled //on copyOutResponse event, so silently ignore this error here }; + module.exports = NativeQuery; diff --git a/lib/query.js b/lib/query.js index 65ffa44b..c1b5ee5e 100644 --- a/lib/query.js +++ b/lib/query.js @@ -2,12 +2,12 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); var Result = require(__dirname + '/result'); -var Types = require(__dirname + '/types'); +var Types = require(__dirname + '/types/'); var utils = require(__dirname + '/utils'); var Query = function(config, values, callback) { // use of "new" optional - if (!(this instanceof Query)) { return new Query(config, values, callback); } + if(!(this instanceof Query)) { return new Query(config, values, callback); } config = utils.normalizeQueryConfig(config, values, callback); @@ -30,9 +30,8 @@ var Query = function(config, values, callback) { }; util.inherits(Query, EventEmitter); -var p = Query.prototype; -p.requiresPreparation = function() { +Query.prototype.requiresPreparation = function() { //named queries must always be prepared if(this.name) { return true; } //always prepare if there are max number of rows expected per @@ -55,7 +54,7 @@ var noParse = function(val) { //associates row metadata from the supplied //message with this query object //metadata used when parsing row results -p.handleRowDescription = function(msg) { +Query.prototype.handleRowDescription = function(msg) { this._fieldNames = []; this._fieldConverters = []; var len = msg.fields.length; @@ -67,7 +66,7 @@ p.handleRowDescription = function(msg) { } }; -p.handleDataRow = function(msg) { +Query.prototype.handleDataRow = function(msg) { var self = this; var row = {}; for(var i = 0; i < msg.fields.length; i++) { @@ -88,12 +87,12 @@ p.handleDataRow = function(msg) { } }; -p.handleCommandComplete = function(msg) { +Query.prototype.handleCommandComplete = function(msg) { this._result.addCommandComplete(msg); }; -p.handleReadyForQuery = function() { - if (this._canceledDueToError) { +Query.prototype.handleReadyForQuery = function() { + if(this._canceledDueToError) { return this.handleError(this._canceledDueToError); } if(this.callback) { @@ -102,8 +101,8 @@ p.handleReadyForQuery = function() { this.emit('end', this._result); }; -p.handleError = function(err) { - if (this._canceledDueToError) { +Query.prototype.handleError = function(err) { + if(this._canceledDueToError) { err = this._canceledDueToError; this._canceledDueToError = false; } @@ -117,7 +116,7 @@ p.handleError = function(err) { this.emit('end'); }; -p.submit = function(connection) { +Query.prototype.submit = function(connection) { var self = this; if(this.requiresPreparation()) { this.prepare(connection); @@ -126,11 +125,11 @@ p.submit = function(connection) { } }; -p.hasBeenParsed = function(connection) { +Query.prototype.hasBeenParsed = function(connection) { return this.name && connection.parsedStatements[this.name]; }; -p.getRows = function(connection) { +Query.prototype.getRows = function(connection) { connection.execute({ portal: this.portalName, rows: this.rows @@ -138,7 +137,7 @@ p.getRows = function(connection) { connection.flush(); }; -p.prepare = function(connection) { +Query.prototype.prepare = function(connection) { var self = this; //prepared statements need sync to be called after each command //complete or when an error is encountered @@ -177,12 +176,14 @@ p.prepare = function(connection) { this.getRows(connection); }; -p.streamData = function (connection) { - if ( this.stream ) this.stream.startStreamingToConnection(connection); + +Query.prototype.streamData = function (connection) { + if(this.stream) this.stream.startStreamingToConnection(connection); else connection.sendCopyFail('No source stream defined'); }; -p.handleCopyFromChunk = function (chunk) { - if ( this.stream ) { + +Query.prototype.handleCopyFromChunk = function (chunk) { + if(this.stream) { this.stream.handleChunk(chunk); } //if there are no stream (for example when copy to query was sent by diff --git a/lib/result.js b/lib/result.js index 68820933..fd920ed4 100644 --- a/lib/result.js +++ b/lib/result.js @@ -8,12 +8,10 @@ var Result = function() { this.rows = []; }; -var p = Result.prototype; - var matchRegexp = /([A-Za-z]+) (\d+ )?(\d+)?/; //adds a command complete message -p.addCommandComplete = function(msg) { +Result.prototype.addCommandComplete = function(msg) { var match; if(msg.text) { //pure javascript @@ -35,7 +33,7 @@ p.addCommandComplete = function(msg) { } }; -p.addRow = function(row) { +Result.prototype.addRow = function(row) { this.rows.push(row); }; diff --git a/lib/arrayParser.js b/lib/types/arrayParser.js similarity index 99% rename from lib/arrayParser.js rename to lib/types/arrayParser.js index 236a2668..96a37b93 100644 --- a/lib/arrayParser.js +++ b/lib/types/arrayParser.js @@ -11,9 +11,11 @@ function ArrayParser(source, converter) { }; } } + ArrayParser.prototype.eof = function() { return this.pos >= this.source.length; }; + ArrayParser.prototype.nextChar = function() { var c; if ((c = this.source[this.pos++]) === "\\") { @@ -28,9 +30,11 @@ ArrayParser.prototype.nextChar = function() { }; } }; + ArrayParser.prototype.record = function(c) { return this.recorded.push(c); }; + ArrayParser.prototype.newEntry = function(includeEmpty) { var entry; if (this.recorded.length > 0 || includeEmpty) { @@ -45,6 +49,7 @@ ArrayParser.prototype.newEntry = function(includeEmpty) { this.recorded = []; } }; + ArrayParser.prototype.parse = function(nested) { var c, p, quote; if (nested === null) { diff --git a/lib/binaryParsers.js b/lib/types/binaryParsers.js similarity index 100% rename from lib/binaryParsers.js rename to lib/types/binaryParsers.js diff --git a/lib/types.js b/lib/types/index.js similarity index 88% rename from lib/types.js rename to lib/types/index.js index 796f4841..d58bc992 100644 --- a/lib/types.js +++ b/lib/types/index.js @@ -1,5 +1,5 @@ -var textParsers = require(__dirname + "/textParsers"), -binaryParsers = require(__dirname + "/binaryParsers"); +var textParsers = require(__dirname + '/textParsers'); +var binaryParsers = require(__dirname + '/binaryParsers'); var typeParsers = { text: {}, diff --git a/lib/textParsers.js b/lib/types/textParsers.js similarity index 98% rename from lib/textParsers.js rename to lib/types/textParsers.js index bfb23bab..e5d2a747 100644 --- a/lib/textParsers.js +++ b/lib/types/textParsers.js @@ -167,6 +167,7 @@ var init = function(register) { register(21, parseInteger); register(23, parseInteger); register(26, parseInteger); + //TODO remove for v1.0 register(1700, function(val){ if(val.length > maxLen) { console.warn( @@ -175,7 +176,9 @@ var init = function(register) { } return parseFloat(val); }); + //TODO remove for v1.0 register(700, parseFloat); + //TODO remove for v1.0 register(701, parseFloat); register(16, parseBool); register(1082, parseDate); // date diff --git a/lib/utils.js b/lib/utils.js index 273decb8..7bbbe5b6 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -19,16 +19,16 @@ if(typeof events.EventEmitter.prototype.once !== 'function') { function arrayString(val) { var result = '{'; for (var i = 0 ; i < val.length; i++) { - if (i > 0) { + if(i > 0) { result = result + ','; } - if (val[i] instanceof Date) { + if(val[i] instanceof Date) { result = result + JSON.stringify(val[i]); } else if(typeof val[i] === 'undefined') { result = result + 'NULL'; } - else if (Array.isArray(val[i])) { + else if(Array.isArray(val[i])) { result = result + arrayString(val[i]); } else @@ -52,7 +52,7 @@ var prepareValue = function(val) { if(typeof val === 'undefined') { return null; } - if (Array.isArray(val)) { + if(Array.isArray(val)) { return arrayString(val); } return val === null ? null : val.toString(); @@ -68,7 +68,7 @@ function normalizeQueryConfig (config, values, callback) { config.values = values; } } - if (callback) { + if(callback) { config.callback = callback; } return config; diff --git a/lib/writer.js b/lib/writer.js index a6d88f38..96a5944f 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -8,10 +8,8 @@ var Writer = function(size) { this.headerPosition = 0; }; -var p = Writer.prototype; - //resizes internal buffer if not enough size left -p._ensure = function(size) { +Writer.prototype._ensure = function(size) { var remaining = this.buffer.length - this.offset; if(remaining < size) { var oldBuffer = this.buffer; @@ -20,7 +18,7 @@ p._ensure = function(size) { } }; -p.addInt32 = function(num) { +Writer.prototype.addInt32 = function(num) { this._ensure(4); this.buffer[this.offset++] = (num >>> 24 & 0xFF); this.buffer[this.offset++] = (num >>> 16 & 0xFF); @@ -29,7 +27,7 @@ p.addInt32 = function(num) { return this; }; -p.addInt16 = function(num) { +Writer.prototype.addInt16 = function(num) { this._ensure(2); this.buffer[this.offset++] = (num >>> 8 & 0xFF); this.buffer[this.offset++] = (num >>> 0 & 0xFF); @@ -48,7 +46,7 @@ if(Buffer.prototype.write.length === 3) { }; } -p.addCString = function(string) { +Writer.prototype.addCString = function(string) { //just write a 0 for empty or null strings if(!string) { this._ensure(1); @@ -63,14 +61,14 @@ p.addCString = function(string) { return this; }; -p.addChar = function(c) { +Writer.prototype.addChar = function(c) { this._ensure(1); writeString(this.buffer, c, this.offset, 1); this.offset++; return this; }; -p.addString = function(string) { +Writer.prototype.addString = function(string) { string = string || ""; var len = Buffer.byteLength(string); this._ensure(len); @@ -79,18 +77,18 @@ p.addString = function(string) { return this; }; -p.getByteLength = function() { +Writer.prototype.getByteLength = function() { return this.offset - 5; }; -p.add = function(otherBuffer) { +Writer.prototype.add = function(otherBuffer) { this._ensure(otherBuffer.length); otherBuffer.copy(this.buffer, this.offset); this.offset += otherBuffer.length; return this; }; -p.clear = function() { +Writer.prototype.clear = function() { this.offset = 5; this.headerPosition = 0; this.lastEnd = 0; @@ -98,7 +96,7 @@ p.clear = function() { //appends a header block to all the written data since the last //subsequent header or to the beginning if there is only one data block -p.addHeader = function(code, last) { +Writer.prototype.addHeader = function(code, last) { var origOffset = this.offset; this.offset = this.headerPosition; this.buffer[this.offset++] = code; @@ -114,14 +112,14 @@ p.addHeader = function(code, last) { } }; -p.join = function(code) { +Writer.prototype.join = function(code) { if(code) { this.addHeader(code, true); } return this.buffer.slice(code ? 0 : 5, this.offset); }; -p.flush = function(code) { +Writer.prototype.flush = function(code) { var result = this.join(code); this.clear(); return result;