From c57eee8661834f363dde55d9a423ad0705a94de4 Mon Sep 17 00:00:00 2001 From: brianc Date: Wed, 6 Mar 2013 10:26:40 -0600 Subject: [PATCH] normalize whitespace, add comments, and do a little house cleaning --- lib/client.js | 46 +++++++++++++++++++++-------------- lib/connection.js | 17 +++++++------ lib/copystream.js | 52 ++++++++++++++++++++-------------------- lib/native/index.js | 18 +++++++------- lib/native/query.js | 20 +++++++++------- lib/query.js | 10 ++++---- lib/types/arrayParser.js | 5 ++++ lib/types/index.js | 4 ++-- lib/types/textParsers.js | 3 +++ lib/utils.js | 10 ++++---- 10 files changed, 105 insertions(+), 80 deletions(-) diff --git a/lib/client.js b/lib/client.js index 8c937a60..bd03425a 100644 --- a/lib/client.js +++ b/lib/client.js @@ -20,18 +20,18 @@ var Client = function(config) { this.host = this.connectionParameters.host; this.password = this.connectionParameters.password; - config = config || {}; + var c = config || {}; - this.connection = config.connection || new Connection({ - stream: config.stream, - ssl: config.ssl + this.connection = c.connection || new Connection({ + stream: c.stream, + ssl: c.ssl }); this.queryQueue = []; - this.binary = config.binary || defaults.binary; + this.binary = c.binary || defaults.binary; this.encoding = 'utf8'; this.processID = null; this.secretKey = null; - this.ssl = config.ssl || false; + this.ssl = c.ssl || false; }; util.inherits(Client, EventEmitter); @@ -48,7 +48,7 @@ Client.prototype.connect = function(callback) { //once connection is established send startup message con.on('connect', function() { - if (self.ssl) { + if(self.ssl) { con.requestSsl(); } else { con.startup({ @@ -57,6 +57,7 @@ Client.prototype.connect = function(callback) { }); } }); + con.on('sslconnect', function() { con.startup({ user: self.user, @@ -89,10 +90,12 @@ Client.prototype.connect = function(callback) { con.on('rowDescription', function(msg) { self.activeQuery.handleRowDescription(msg); }); + //delegate datarow to active query con.on('dataRow', function(msg) { self.activeQuery.handleDataRow(msg); }); + //TODO should query gain access to connection? con.on('portalSuspended', function(msg) { self.activeQuery.getRows(con); @@ -106,11 +109,13 @@ Client.prototype.connect = function(callback) { con.sync(); } }); + con.on('copyInResponse', function(msg) { self.activeQuery.streamData(self.connection); }); + con.on('copyOutResponse', function(msg) { - if (self.activeQuery.stream === undefined) { + if(self.activeQuery.stream === undefined) { self.activeQuery._canceledDueToError = new Error('No destination stream defined'); //canceling query requires creation of new connection @@ -119,9 +124,11 @@ Client.prototype.connect = function(callback) { .cancel(self, self.activeQuery); } }); + con.on('copyData', function (msg) { self.activeQuery.handleCopyFromChunk(msg.chunk); }); + if (!callback) { self.emit('connect'); } else { @@ -169,7 +176,7 @@ Client.prototype.connect = function(callback) { }; Client.prototype.cancel = function(client, query) { - if (client.activeQuery == query) { + if(client.activeQuery == query) { var con = this.connection; if(this.host && this.host.indexOf('/') === 0) { @@ -182,8 +189,7 @@ Client.prototype.cancel = function(client, query) { con.on('connect', function() { con.cancel(client.processID, client.secretKey); }); - } - else if (client.queryQueue.indexOf(query) != -1) { + } else if(client.queryQueue.indexOf(query) != -1) { client.queryQueue.splice(client.queryQueue.indexOf(query), 1); } }; @@ -197,25 +203,29 @@ Client.prototype._pulseQueryQueue = function() { this.activeQuery.submit(this.connection); } else if(this.hasExecuted) { this.activeQuery = null; - if(this._drainPaused > 0) { this._drainPaused++; } - else { this.emit('drain'); } + //TODO remove pauseDrain for v1.0 + if(this._drainPaused > 0) { + this._drainPaused++; + } + else { + this.emit('drain'); + } } } }; Client.prototype._copy = function (text, stream) { - var config = {}, - query; + var config = {}; config.text = text; config.stream = stream; config.callback = function (error) { - if (error) { + if(error) { config.stream.error(error); } else { config.stream.close(); } }; - query = new Query(config); + var query = new Query(config); this.queryQueue.push(query); this._pulseQueryQueue(); return config.stream; @@ -234,7 +244,7 @@ Client.prototype.query = function(config, values, callback) { //can take in strings, config object or query object var query = (config instanceof Query) ? config : new Query(config, values, callback); - if (this.binary && !query.binary) { + if(this.binary && !query.binary) { query.binary = true; } diff --git a/lib/connection.js b/lib/connection.js index 4c128d9a..c1a120df 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -24,9 +24,9 @@ util.inherits(Connection, EventEmitter); Connection.prototype.connect = function(port, host) { - if (this.stream.readyState === 'closed') { + if(this.stream.readyState === 'closed') { this.stream.connect(port, host); - } else if (this.stream.readyState == 'open') { + } else if(this.stream.readyState == 'open') { this.emit('connect'); } @@ -48,7 +48,7 @@ Connection.prototype.connect = function(port, host) { self.emit(msg.name, msg); }); this.once('sslresponse', function(msg) { - if (msg.text == 0x53) { + if(msg.text == 0x53) { var tls = require('tls'); self.stream.removeAllListeners(); self.stream = tls.connect({ @@ -214,7 +214,7 @@ Connection.prototype.bind = function(config, more) { } } - if (config.binary) { + if(config.binary) { buffer.addInt16(1); // format codes to use binary buffer.addInt16(1); } @@ -301,7 +301,10 @@ Connection.prototype.readSslResponse = function() { this.lastOffset = this.offset; return false; } - return { name: 'sslresponse', text: this.buffer[this.offset++] }; + return { + name: 'sslresponse', + text: this.buffer[this.offset++] + }; }; Connection.prototype.parseMessage = function() { @@ -500,12 +503,12 @@ Connection.prototype.parseE = function(input) { fields[fieldType] = this.parseCString(); fieldType = this.readString(1); } - if (input.name === 'error') { + if(input.name === 'error') { // the msg is an Error instance msg = new Error(fields.M); for (item in input) { // copy input properties to the error - if (input.hasOwnProperty(item)) { + if(input.hasOwnProperty(item)) { msg[item] = input[item]; } } diff --git a/lib/copystream.js b/lib/copystream.js index bd589206..f82aadd6 100644 --- a/lib/copystream.js +++ b/lib/copystream.js @@ -19,7 +19,7 @@ CopyFromStream.prototype._writable = function () { }; CopyFromStream.prototype.startStreamingToConnection = function (connection) { - if (this._error) { + if(this._error) { return; } this._connection = connection; @@ -30,13 +30,13 @@ CopyFromStream.prototype.startStreamingToConnection = function (connection) { CopyFromStream.prototype._handleChunk = function (string, encoding) { var dataChunk, tmpBuffer; - if (string !== undefined) { - if (string instanceof Buffer) { + if(string !== undefined) { + if(string instanceof Buffer) { dataChunk = string; } else { dataChunk = new Buffer(string, encoding); } - if (this._buffer.length) { + if(this._buffer.length) { //Buffer.concat is better, but it's missing //in node v0.6.x tmpBuffer = new Buffer(this._buffer.length + dataChunk.length); @@ -53,10 +53,10 @@ CopyFromStream.prototype._handleChunk = function (string, encoding) { CopyFromStream.prototype._sendIfConnectionReady = function () { var dataSent = false; - if (this._connection) { + if(this._connection) { dataSent = this._connection.sendCopyFromChunk(this._buffer); this._buffer = new Buffer(0); - if (this._dataBuffered) { + if(this._dataBuffered) { this.emit('drain'); } this._dataBuffered = false; @@ -67,32 +67,32 @@ CopyFromStream.prototype._sendIfConnectionReady = function () { }; CopyFromStream.prototype._endIfNeedAndPossible = function () { - if (this._connection && this._finished && !this._finishedSent) { + if(this._connection && this._finished && !this._finishedSent) { this._finishedSent = true; this._connection.endCopyFrom(); } }; CopyFromStream.prototype.write = function (string, encoding) { - if (this._error || this._finished) { + if(this._error || this._finished) { return false; } return this._handleChunk.apply(this, arguments); }; CopyFromStream.prototype.end = function (string, encondig) { - if (this._error || this._finished) { + if(this._error || this._finished) { return false; } this._finished = true; - if (string !== undefined) { + if(string !== undefined) { this._handleChunk.apply(this, arguments); } this._endIfNeedAndPossible(); }; CopyFromStream.prototype.error = function (error) { - if (this._error || this._closed) { + if(this._error || this._closed) { return false; } this._error = true; @@ -100,10 +100,10 @@ CopyFromStream.prototype.error = function (error) { }; CopyFromStream.prototype.close = function () { - if (this._error || this._closed) { + if(this._error || this._closed) { return false; } - if (!this._finishedSent) { + if(!this._finishedSent) { throw new Error("seems to be error in code that uses CopyFromStream"); } this.emit("close"); @@ -122,11 +122,11 @@ var CopyToStream = function () { util.inherits(CopyToStream, Stream); CopyToStream.prototype._outputDataChunk = function () { - if (this._paused) { + if(this._paused) { return; } - if (this.buffer.length) { - if (this._encoding) { + if(this.buffer.length) { + if(this._encoding) { this.emit('data', this.buffer.toString(this._encoding)); } else { this.emit('data', this.buffer); @@ -140,31 +140,31 @@ CopyToStream.prototype._readable = function () { }; CopyToStream.prototype.error = function (error) { - if (!this.readable) { + if(!this.readable) { return false; } this._error = error; - if (!this._paused) { + if(!this._paused) { this.emit('error', error); } }; CopyToStream.prototype.close = function () { - if (!this.readable) { + if(!this.readable) { return false; } this._finished = true; - if (!this._paused) { + if(!this._paused) { this.emit("end"); } }; CopyToStream.prototype.handleChunk = function (chunk) { var tmpBuffer; - if (!this.readable) { + if(!this.readable) { return; } - if (!this.buffer.length) { + if(!this.buffer.length) { this.buffer = chunk; } else { tmpBuffer = new Buffer(this.buffer.length + chunk.length); @@ -176,22 +176,22 @@ CopyToStream.prototype.handleChunk = function (chunk) { }; CopyToStream.prototype.pause = function () { - if (!this.readable) { + if(!this.readable) { return false; } this._paused = true; }; CopyToStream.prototype.resume = function () { - if (!this._paused) { + if(!this._paused) { return false; } this._paused = false; this._outputDataChunk(); - if (this._error) { + if(this._error) { return this.emit('error', this._error); } - if (this._finished) { + if(this._finished) { return this.emit('end'); } }; diff --git a/lib/native/index.js b/lib/native/index.js index f26cd7db..2918689e 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -2,7 +2,6 @@ var EventEmitter = require('events').EventEmitter; var ConnectionParameters = require(__dirname + '/../connection-parameters'); -var utils = require(__dirname + "/../utils"); var CopyFromStream = require(__dirname + '/../copystream').CopyFromStream; var CopyToStream = require(__dirname + '/../copystream').CopyToStream; @@ -92,10 +91,11 @@ Connection.prototype.query = function(config, values, callback) { var nativeCancel = Connection.prototype.cancel; Connection.prototype.cancel = function(client, query) { - if (client._activeQuery == query) + if (client._activeQuery == query) { this.connect(nativeCancel.bind(client)); - else if (client._queryQueue.indexOf(query) != -1) + } else if (client._queryQueue.indexOf(query) != -1) { client._queryQueue.splice(client._queryQueue.indexOf(query), 1); + } }; Connection.prototype._pulseQueryQueue = function(initialConnection) { @@ -108,6 +108,7 @@ Connection.prototype._pulseQueryQueue = function(initialConnection) { var query = this._queryQueue.shift(); if(!query) { if(!initialConnection) { + //TODO remove all the pause-drain stuff for v1.0 if(this._drainPaused) { this._drainPaused++; } else { @@ -125,8 +126,7 @@ Connection.prototype._pulseQueryQueue = function(initialConnection) { this._namedQueries[query.name] = true; this._sendPrepare(query.name, query.text, (query.values||[]).length); } - } - else if(query.values) { + } else if(query.values) { //call native function this._sendQueryWithParams(query.text, query.values); } else { @@ -135,10 +135,12 @@ Connection.prototype._pulseQueryQueue = function(initialConnection) { } }; +//TODO remove all the pause-drain stuff for v1.0 Connection.prototype.pauseDrain = function() { this._drainPaused = 1; }; +//TODO remove all the pause-drain stuff for v1.0 Connection.prototype.resumeDrain = function() { if(this._drainPaused > 1) { this.emit('drain'); @@ -215,10 +217,8 @@ var clientBuilder = function(config) { }); connection.on('copyOutResponse', function(msg) { if (connection._activeQuery.stream === undefined) { - connection._activeQuery._canceledDueToError = - new Error('No destination stream defined'); - (new clientBuilder({port: connection.port, host: connection.host})) - .cancel(connection, connection._activeQuery); + connection._activeQuery._canceledDueToError = new Error('No destination stream defined'); + (new clientBuilder({port: connection.port, host: connection.host})).cancel(connection, connection._activeQuery); } }); connection.on('copyData', function (chunk) { diff --git a/lib/native/query.js b/lib/native/query.js index cc57d476..4abbd5f4 100644 --- a/lib/native/query.js +++ b/lib/native/query.js @@ -14,12 +14,12 @@ var NativeQuery = function(config, values, callback) { EventEmitter.call(this); - config = utils.normalizeQueryConfig(config, values, callback); + var c = utils.normalizeQueryConfig(config, values, callback); - this.name = config.name; - this.text = config.text; - this.values = config.values; - this.callback = config.callback; + this.name = c.name; + this.text = c.text; + this.values = c.values; + this.callback = c.callback; this._result = new Result(); //normalize values @@ -79,12 +79,16 @@ NativeQuery.prototype.handleReadyForQuery = function(meta) { }; NativeQuery.prototype.streamData = function (connection) { - if ( this.stream ) this.stream.startStreamingToConnection(connection); - else connection.sendCopyFail('No source stream defined'); + if(this.stream) { + this.stream.startStreamingToConnection(connection); + } + else { + connection.sendCopyFail('No source stream defined'); + } }; NativeQuery.prototype.handleCopyFromChunk = function (chunk) { - if ( this.stream ) { + if(this.stream) { this.stream.handleChunk(chunk); } //if there are no stream (for example when copy to query was sent by diff --git a/lib/query.js b/lib/query.js index c228dfb1..c1b5ee5e 100644 --- a/lib/query.js +++ b/lib/query.js @@ -7,7 +7,7 @@ var utils = require(__dirname + '/utils'); var Query = function(config, values, callback) { // use of "new" optional - if (!(this instanceof Query)) { return new Query(config, values, callback); } + if(!(this instanceof Query)) { return new Query(config, values, callback); } config = utils.normalizeQueryConfig(config, values, callback); @@ -92,7 +92,7 @@ Query.prototype.handleCommandComplete = function(msg) { }; Query.prototype.handleReadyForQuery = function() { - if (this._canceledDueToError) { + if(this._canceledDueToError) { return this.handleError(this._canceledDueToError); } if(this.callback) { @@ -102,7 +102,7 @@ Query.prototype.handleReadyForQuery = function() { }; Query.prototype.handleError = function(err) { - if (this._canceledDueToError) { + if(this._canceledDueToError) { err = this._canceledDueToError; this._canceledDueToError = false; } @@ -178,12 +178,12 @@ Query.prototype.prepare = function(connection) { }; Query.prototype.streamData = function (connection) { - if ( this.stream ) this.stream.startStreamingToConnection(connection); + if(this.stream) this.stream.startStreamingToConnection(connection); else connection.sendCopyFail('No source stream defined'); }; Query.prototype.handleCopyFromChunk = function (chunk) { - if ( this.stream ) { + if(this.stream) { this.stream.handleChunk(chunk); } //if there are no stream (for example when copy to query was sent by diff --git a/lib/types/arrayParser.js b/lib/types/arrayParser.js index 236a2668..96a37b93 100644 --- a/lib/types/arrayParser.js +++ b/lib/types/arrayParser.js @@ -11,9 +11,11 @@ function ArrayParser(source, converter) { }; } } + ArrayParser.prototype.eof = function() { return this.pos >= this.source.length; }; + ArrayParser.prototype.nextChar = function() { var c; if ((c = this.source[this.pos++]) === "\\") { @@ -28,9 +30,11 @@ ArrayParser.prototype.nextChar = function() { }; } }; + ArrayParser.prototype.record = function(c) { return this.recorded.push(c); }; + ArrayParser.prototype.newEntry = function(includeEmpty) { var entry; if (this.recorded.length > 0 || includeEmpty) { @@ -45,6 +49,7 @@ ArrayParser.prototype.newEntry = function(includeEmpty) { this.recorded = []; } }; + ArrayParser.prototype.parse = function(nested) { var c, p, quote; if (nested === null) { diff --git a/lib/types/index.js b/lib/types/index.js index af7ef994..d58bc992 100644 --- a/lib/types/index.js +++ b/lib/types/index.js @@ -1,5 +1,5 @@ -var textParsers = require(__dirname + '/textParsers'), -binaryParsers = require(__dirname + '/binaryParsers'); +var textParsers = require(__dirname + '/textParsers'); +var binaryParsers = require(__dirname + '/binaryParsers'); var typeParsers = { text: {}, diff --git a/lib/types/textParsers.js b/lib/types/textParsers.js index bfb23bab..e5d2a747 100644 --- a/lib/types/textParsers.js +++ b/lib/types/textParsers.js @@ -167,6 +167,7 @@ var init = function(register) { register(21, parseInteger); register(23, parseInteger); register(26, parseInteger); + //TODO remove for v1.0 register(1700, function(val){ if(val.length > maxLen) { console.warn( @@ -175,7 +176,9 @@ var init = function(register) { } return parseFloat(val); }); + //TODO remove for v1.0 register(700, parseFloat); + //TODO remove for v1.0 register(701, parseFloat); register(16, parseBool); register(1082, parseDate); // date diff --git a/lib/utils.js b/lib/utils.js index 273decb8..7bbbe5b6 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -19,16 +19,16 @@ if(typeof events.EventEmitter.prototype.once !== 'function') { function arrayString(val) { var result = '{'; for (var i = 0 ; i < val.length; i++) { - if (i > 0) { + if(i > 0) { result = result + ','; } - if (val[i] instanceof Date) { + if(val[i] instanceof Date) { result = result + JSON.stringify(val[i]); } else if(typeof val[i] === 'undefined') { result = result + 'NULL'; } - else if (Array.isArray(val[i])) { + else if(Array.isArray(val[i])) { result = result + arrayString(val[i]); } else @@ -52,7 +52,7 @@ var prepareValue = function(val) { if(typeof val === 'undefined') { return null; } - if (Array.isArray(val)) { + if(Array.isArray(val)) { return arrayString(val); } return val === null ? null : val.toString(); @@ -68,7 +68,7 @@ function normalizeQueryConfig (config, values, callback) { config.values = values; } } - if (callback) { + if(callback) { config.callback = callback; } return config;