diff --git a/lib/client.js b/lib/client.js index 46f9eda1..18605c65 100644 --- a/lib/client.js +++ b/lib/client.js @@ -108,6 +108,12 @@ p.connect = function(callback) { con.on('copyInResponse', function(msg) { self.activeQuery.streamData(self.connection); }); + con.on('copyOutResponse', function(msg) { + if (self.activeQuery.stream === undefined) { + self.activeQuery._canceledDueToError = new Error('No destination stream defined'); + (new self.constructor(self.config)).cancel(self, self.activeQuery); + } + }); con.on('copyData', function (msg) { self.activeQuery.handleCopyFromChunk(msg.chunk); }); diff --git a/lib/native/index.js b/lib/native/index.js index fd405f29..dfb2ff3a 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -72,8 +72,8 @@ p.copyTo = function (text) { p.sendCopyFromChunk = function (chunk) { this._sendCopyFromChunk(chunk); }; -p.endCopyFrom = function () { - this._endCopyFrom(); +p.endCopyFrom = function (msg) { + this._endCopyFrom(msg); }; p.query = function(config, values, callback) { var query = (config instanceof NativeQuery) ? config : new NativeQuery(config, values, callback); @@ -134,7 +134,9 @@ p.resumeDrain = function() { }; this._drainPaused = 0; }; - +p.sendCopyFail = function(msg) { + this.endCopyFrom(msg); +}; var clientBuilder = function(config) { config = config || {}; var connection = new Connection(); @@ -198,6 +200,12 @@ var clientBuilder = function(config) { //start to send data from stream connection._activeQuery.streamData(connection); }); + connection.on('copyOutResponse', function(msg) { + if (connection._activeQuery.stream === undefined) { + connection._activeQuery._canceledDueToError = new Error('No destination stream defined'); + (new clientBuilder(connection.config)).cancel(connection, connection._activeQuery); + } + }); connection.on('copyData', function (chunk) { //recieve chunk from connection //move it to stream diff --git a/lib/native/query.js b/lib/native/query.js index 9b334b1a..26e1f506 100644 --- a/lib/native/query.js +++ b/lib/native/query.js @@ -26,6 +26,7 @@ var NativeQuery = function(config, values, callback) { this.values[i] = utils.prepareValue(this.values[i]); } } + this._canceledDueToError = false; }; util.inherits(NativeQuery, EventEmitter); @@ -50,6 +51,10 @@ p.handleRow = function(rowData) { }; p.handleError = function(error) { + if (this._canceledDueToError) { + error = this._canceledDueToError; + this._canceledDueToError = false; + } if(this.callback) { this.callback(error); this.callback = null; @@ -68,9 +73,15 @@ p.handleReadyForQuery = function(meta) { this.emit('end', this._result); }; p.streamData = function (connection) { - this.stream.startStreamingToConnection(connection); + if ( this.stream ) this.stream.startStreamingToConnection(connection); + else connection.sendCopyFail('No source stream defined'); }; p.handleCopyFromChunk = function (chunk) { - this.stream.handleChunk(chunk); + if ( this.stream ) { + this.stream.handleChunk(chunk); + } + //if there are no stream (for example when copy to query was sent by + //query method instead of copyTo) error will be handled + //on copyOutResponse event, so silently ignore this error here } module.exports = NativeQuery; diff --git a/lib/query.js b/lib/query.js index 5f22a66b..5c877aa9 100644 --- a/lib/query.js +++ b/lib/query.js @@ -25,6 +25,7 @@ var Query = function(config, values, callback) { this._fieldConverters = []; this._result = new Result(); this.isPreparedStatement = false; + this._canceledDueToError = false; EventEmitter.call(this); }; @@ -99,6 +100,10 @@ p.handleReadyForQuery = function() { }; p.handleError = function(err) { + if (this._canceledDueToError) { + err = this._canceledDueToError; + this._canceledDueToError = false; + } //if callback supplied do not emit error event as uncaught error //events will bubble up to node process if(this.callback) { @@ -174,10 +179,11 @@ p.streamData = function (connection) { else connection.sendCopyFail('No source stream defined'); }; p.handleCopyFromChunk = function (chunk) { - if ( this.stream ) this.stream.handleChunk(chunk); - else { - // TODO: signal the problem somehow - //this.handleError(new Error('error', 'No destination stream defined')); - } + if ( this.stream ) { + this.stream.handleChunk(chunk); + } + //if there are no stream (for example when copy to query was sent by + //query method instead of copyTo) error will be handled + //on copyOutResponse event, so silently ignore this error here } module.exports = Query; diff --git a/src/binding.cc b/src/binding.cc index 982aa969..c26adc1c 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -248,12 +248,14 @@ public: bool connecting_; bool ioInitialized_; bool copyOutMode_; + bool copyInMode_; Connection () : ObjectWrap () { connection_ = NULL; connecting_ = false; ioInitialized_ = false; copyOutMode_ = false; + copyInMode_ = false; TRACE("Initializing ev watchers"); read_watcher_.data = this; write_watcher_.data = this; @@ -278,8 +280,13 @@ public: EndCopyFrom(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); + char * error_msg = NULL; + if (args[0]->IsString()) { + error_msg = MallocCString(args[0]); + } //TODO handle errors in some way - self->EndCopyFrom(); + self->EndCopyFrom(error_msg); + free(error_msg); return Undefined(); } @@ -433,23 +440,19 @@ protected: if (this->copyOutMode_) { this->HandleCopyOut(); } - if (PQisBusy(connection_) == 0) { + if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) { PGresult *result; bool didHandleResult = false; while ((result = PQgetResult(connection_))) { - if (PGRES_COPY_IN == PQresultStatus(result)) { - didHandleResult = false; - Emit("copyInResponse"); - PQclear(result); + didHandleResult = HandleResult(result); + PQclear(result); + if(!didHandleResult) { + //this means that we are in copy in or copy out mode + //in this situation PQgetResult will return same + //result untill all data will be read (copy out) or + //until data end notification (copy in) + //and because of this, we need to break cycle break; - } else if (PGRES_COPY_OUT == PQresultStatus(result)) { - PQclear(result); - this->copyOutMode_ = true; - didHandleResult = this->HandleCopyOut(); - } else { - HandleResult(result); - didHandleResult = true; - PQclear(result); } } //might have fired from notification @@ -479,37 +482,29 @@ protected: } bool HandleCopyOut () { char * buffer = NULL; - int copied = PQgetCopyData(connection_, &buffer, 1); - if (copied > 0) { - Buffer * chunk = Buffer::New(buffer, copied); + int copied; + Buffer * chunk; + copied = PQgetCopyData(connection_, &buffer, 1); + while (copied > 0) { + chunk = Buffer::New(buffer, copied); Handle node_chunk = chunk->handle_; Emit("copyData", &node_chunk); PQfreemem(buffer); - //result was not handled copmpletely - return false; - } else if (copied == 0) { + copied = PQgetCopyData(connection_, &buffer, 1); + } + if (copied == 0) { //wait for next read ready //result was not handled copmpletely return false; } else if (copied == -1) { - PGresult *result; - //result is handled completely this->copyOutMode_ = false; - if (PQisBusy(connection_) == 0 && (result = PQgetResult(connection_))) { - HandleResult(result); - PQclear(result); - return true; - } else { - return false; - } + return true; } else if (copied == -2) { - //TODO error handling - //result is handled with error - HandleErrorResult(NULL); + this->copyOutMode_ = false; return true; } } - void HandleResult(PGresult* result) + bool HandleResult(PGresult* result) { ExecStatusType status = PQresultStatus(result); switch(status) { @@ -517,14 +512,35 @@ protected: { HandleTuplesResult(result); EmitCommandMetaData(result); + return true; } break; case PGRES_FATAL_ERROR: - HandleErrorResult(result); + { + HandleErrorResult(result); + return true; + } break; case PGRES_COMMAND_OK: case PGRES_EMPTY_QUERY: - EmitCommandMetaData(result); + { + EmitCommandMetaData(result); + return true; + } + break; + case PGRES_COPY_IN: + { + this->copyInMode_ = true; + Emit("copyInResponse"); + return false; + } + break; + case PGRES_COPY_OUT: + { + this->copyOutMode_ = true; + Emit("copyOutResponse"); + return this->HandleCopyOut(); + } break; default: printf("YOU SHOULD NEVER SEE THIS! PLEASE OPEN AN ISSUE ON GITHUB! Unrecogized query status: %s\n", PQresStatus(status)); @@ -772,8 +788,9 @@ private: void SendCopyFromChunk(Handle chunk) { PQputCopyData(connection_, Buffer::Data(chunk), Buffer::Length(chunk)); } - void EndCopyFrom() { - PQputCopyEnd(connection_, NULL); + void EndCopyFrom(char * error_msg) { + PQputCopyEnd(connection_, error_msg); + this->copyInMode_ = false; } };