diff --git a/lib/client.js b/lib/client.js index 75c50350..33f761be 100644 --- a/lib/client.js +++ b/lib/client.js @@ -171,6 +171,15 @@ Client.prototype.connect = function(callback) { } }); + con.once('end', function() { + if(self.activeQuery) { + self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution')); + self.activeQuery = null; + } + self.emit('end'); + }); + + con.on('notice', function(msg) { self.emit('notice', msg); }); diff --git a/lib/connection.js b/lib/connection.js index 2f90af74..2f79016c 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -40,6 +40,10 @@ Connection.prototype.connect = function(port, host) { self.emit('error', error); }); + this.stream.on('end', function() { + self.emit('end'); + }); + if(this.ssl) { this.stream.once('data', function(buffer) { self.setBuffer(buffer); diff --git a/lib/native/index.js b/lib/native/index.js index 2918689e..7ddc978d 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -198,6 +198,15 @@ var clientBuilder = function(config) { } }); + connection.on('_end', function() { + process.nextTick(function() { + if(connection._activeQuery) { + connection._activeQuery.handleError(new Error("Connection was ended during query")); + } + connection.emit('end'); + }); + }); + connection.on('_readyForQuery', function() { var q = this._activeQuery; //a named query finished being prepared diff --git a/package.json b/package.json index 556333bc..2605b581 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "deprecate": "~0.1.0" }, "devDependencies": { - "jshint": "git://github.com/jshint/jshint.git" + "jshint": "1.1.0" }, "scripts": { "test": "make test-all connectionString=pg://postgres@localhost:5432/postgres", diff --git a/src/binding.cc b/src/binding.cc index a2d2d27d..e0b087e0 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -6,7 +6,7 @@ #include #define LOG(msg) printf("%s\n",msg); -#define TRACE(msg) //printf("%s\n", msg); +#define TRACE(msg) //printf(%s\n, msg); #define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); @@ -434,12 +434,15 @@ protected: if(revents & UV_READABLE) { TRACE("revents & UV_READABLE"); + TRACE("about to consume input"); if(PQconsumeInput(connection_) == 0) { + TRACE("could not read, terminating"); End(); EmitLastError(); //LOG("Something happened, consume input is 0"); return; } + TRACE("Consumed"); //declare handlescope as this method is entered via a libuv callback //and not part of the public v8 interface @@ -450,8 +453,11 @@ protected: if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) { PGresult *result; bool didHandleResult = false; + TRACE("PQgetResult"); while ((result = PQgetResult(connection_))) { + TRACE("HandleResult"); didHandleResult = HandleResult(result); + TRACE("PQClear"); PQclear(result); if(!didHandleResult) { //this means that we are in copy in or copy out mode @@ -469,6 +475,7 @@ protected: } PGnotify *notify; + TRACE("PQnotifies"); while ((notify = PQnotifies(connection_))) { Local result = Object::New(); result->Set(channel_symbol, String::New(notify->relname)); @@ -515,6 +522,7 @@ protected: } bool HandleResult(PGresult* result) { + TRACE("PQresultStatus"); ExecStatusType status = PQresultStatus(result); switch(status) { case PGRES_TUPLES_OK: @@ -526,6 +534,7 @@ protected: break; case PGRES_FATAL_ERROR: { + TRACE("HandleErrorResult"); HandleErrorResult(result); return true; } @@ -610,8 +619,15 @@ protected: { HandleScope scope; //instantiate the return object as an Error with the summary Postgres message - Local msg = Local::Cast(Exception::Error(String::New(PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)))); - + TRACE("ReadResultField"); + const char* errorMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); + if(!errorMessage) { + //there is no error, it has already been consumed in the last + //read-loop callback + return; + } + Local msg = Local::Cast(Exception::Error(String::New(errorMessage))); + TRACE("AttachErrorFields"); //add the other information returned by Postgres to the error object AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY); AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE); @@ -625,6 +641,7 @@ protected: AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE); AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION); Handle m = msg; + TRACE("EmitError"); Emit("_error", &m); } @@ -638,9 +655,11 @@ protected: void End() { + TRACE("stopping read & write"); StopRead(); StopWrite(); DestroyConnection(); + Emit("_end"); } private: @@ -719,7 +738,7 @@ private: void StopWrite() { TRACE("write STOP"); - if(ioInitialized_) { + if(ioInitialized_ && writing_) { uv_poll_stop(&write_watcher_); writing_ = false; } @@ -739,7 +758,7 @@ private: void StopRead() { TRACE("read STOP"); - if(ioInitialized_) { + if(ioInitialized_ && reading_) { uv_poll_stop(&read_watcher_); reading_ = false; } diff --git a/test/integration/client/error-handling-tests.js b/test/integration/client/error-handling-tests.js index b35588c5..1f02597f 100644 --- a/test/integration/client/error-handling-tests.js +++ b/test/integration/client/error-handling-tests.js @@ -158,6 +158,16 @@ test('multiple connection errors (gh#31)', function() { var badConString = "tcp://aslkdfj:oi14081@"+helper.args.host+":"+helper.args.port+"/"+helper.args.database; return false; }); - +}); + +test('query receives error on client shutdown', function() { + var client = new Client(helper.config); + client.connect(assert.calls(function() { + client.query('SELECT pg_sleep(5)', assert.calls(function(err, res) { + assert(err); + })); + client.end(); + assert.emits(client, 'end'); + })); }); diff --git a/test/integration/client/query-error-handling-tests.js b/test/integration/client/query-error-handling-tests.js new file mode 100644 index 00000000..cef84060 --- /dev/null +++ b/test/integration/client/query-error-handling-tests.js @@ -0,0 +1,24 @@ +var helper = require(__dirname + '/test-helper'); +var util = require('util'); + +test('error during query execution', function() { + var client = new Client(helper.args); + client.connect(assert.success(function() { + var sleepQuery = 'select pg_sleep(5)'; + client.query(sleepQuery, assert.calls(function(err, result) { + assert(err); + client.end(); + assert.emits(client, 'end'); + })); + var client2 = new Client(helper.args); + client2.connect(assert.success(function() { +var killIdleQuery = "SELECT procpid, (SELECT pg_terminate_backend(procpid)) AS killed FROM pg_stat_activity WHERE current_query = $1"; + client2.query(killIdleQuery, [sleepQuery], assert.calls(function(err, res) { + assert.ifError(err); + assert.equal(res.rowCount, 1); + client2.end(); + assert.emits(client2, 'end'); + })); + })); + })); +}); diff --git a/test/native/error-tests.js b/test/native/error-tests.js index 3184df57..3a932705 100644 --- a/test/native/error-tests.js +++ b/test/native/error-tests.js @@ -5,26 +5,30 @@ test('query with non-text as first parameter throws error', function() { var client = new Client(helper.config); client.connect(); assert.emits(client, 'connect', function() { - assert.throws(function() { - client.query({text:{fail: true}}); - }) client.end(); - }) -}) + assert.emits(client, 'end', function() { + assert.throws(function() { + client.query({text:{fail: true}}); + }); + }); + }); +}); test('parameterized query with non-text as first parameter throws error', function() { var client = new Client(helper.config); client.connect(); assert.emits(client, 'connect', function() { - assert.throws(function() { - client.query({ - text: {fail: true}, - values: [1, 2] - }) - }) client.end(); - }) -}) + assert.emits(client, 'end', function() { + assert.throws(function() { + client.query({ + text: {fail: true}, + values: [1, 2] + }) + }); + }); + }); +}); var connect = function(callback) { var client = new Client(helper.config); @@ -37,24 +41,28 @@ var connect = function(callback) { test('parameterized query with non-array for second value', function() { test('inline', function() { connect(function(client) { - assert.throws(function() { - client.query("SELECT *", "LKSDJF") - }) client.end(); - }) - }) + assert.emits(client, 'end', function() { + assert.throws(function() { + client.query("SELECT *", "LKSDJF") + }); + }); + }); + }); test('config', function() { connect(function(client) { - assert.throws(function() { - client.query({ - text: "SELECT *", - values: "ALSDKFJ" - }) - }) client.end(); - }) - }) -}) + assert.emits(client, 'end', function() { + assert.throws(function() { + client.query({ + text: "SELECT *", + values: "ALSDKFJ" + }); + }); + }); + }); + }); +}); diff --git a/test/unit/client/stream-and-query-error-interaction-tests.js b/test/unit/client/stream-and-query-error-interaction-tests.js new file mode 100644 index 00000000..9b02caf8 --- /dev/null +++ b/test/unit/client/stream-and-query-error-interaction-tests.js @@ -0,0 +1,26 @@ +var helper = require(__dirname + '/test-helper'); +var Connection = require(__dirname + '/../../../lib/connection'); +var Client = require(__dirname + '/../../../lib/client'); + +test('emits end when not in query', function() { + var stream = new (require('events').EventEmitter)(); + stream.write = function() { + //NOOP + } + var client = new Client({connection: new Connection({stream: stream})}); + client.connect(assert.calls(function() { + client.query('SELECT NOW()', assert.calls(function(err, result) { + assert(err); + })); + })); + assert.emits(client, 'end'); + client.connection.emit('connect'); + process.nextTick(function() { + client.connection.emit('readyForQuery'); + assert.equal(client.queryQueue.length, 0); + assert(client.activeQuery, 'client should have issued query'); + process.nextTick(function() { + stream.emit('end'); + }); + }); +});