diff --git a/lib/client.js b/lib/client.js index eb04e6bd..46f9eda1 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,7 +6,8 @@ var Query = require(__dirname + '/query'); var utils = require(__dirname + '/utils'); var defaults = require(__dirname + '/defaults'); var Connection = require(__dirname + '/connection'); - +var CopyFromStream = require(__dirname + '/copystream').CopyFromStream; +var CopyToStream = require(__dirname + '/copystream').CopyToStream; var Client = function(config) { EventEmitter.call(this); if(typeof config === 'string') { @@ -104,7 +105,12 @@ p.connect = function(callback) { con.sync(); } }); - + con.on('copyInResponse', function(msg) { + self.activeQuery.streamData(self.connection); + }); + con.on('copyData', function (msg) { + self.activeQuery.handleCopyFromChunk(msg.chunk); + }); if (!callback) { self.emit('connect'); } else { @@ -184,7 +190,30 @@ p._pulseQueryQueue = function() { } } }; +p._copy = function (text, stream) { + var config = {}, + query; + config.text = text; + config.stream = stream; + config.callback = function (error) { + if (error) { + config.stream.error(error); + } else { + config.stream.close(); + } + } + query = new Query(config); + this.queryQueue.push(query); + this._pulseQueryQueue(); + return config.stream; +}; +p.copyFrom = function (text) { + return this._copy(text, new CopyFromStream()); +} +p.copyTo = function (text) { + return this._copy(text, new CopyToStream()); +} p.query = function(config, values, callback) { //can take in strings, config object or query object var query = (config instanceof Query) ? config : new Query(config, values, callback); diff --git a/lib/connection.js b/lib/connection.js index 8aa4fa73..49722c2d 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -261,7 +261,12 @@ p.describe = function(msg, more) { this.writer.addCString(msg.type + (msg.name || '')); this._send(0x44, more); }; - +p.sendCopyFromChunk = function (chunk) { + this.stream.write(this.writer.add(chunk).flush(0x64)); +} +p.endCopyFrom = function () { + this.stream.write(this.writer.add(emptyBuffer).flush(0x63)); +} //parsing methods p.setBuffer = function(buffer) { if(this.lastBuffer) { //we have unfinished biznaz @@ -311,7 +316,6 @@ p.parseMessage = function() { var msg = { length: length }; - switch(id) { @@ -375,6 +379,21 @@ p.parseMessage = function() { msg.name = 'portalSuspended'; return msg; + case 0x47: //G + msg.name = 'copyInResponse'; + return this.parseGH(msg); + + case 0x48: //H + msg.name = 'copyOutResponse'; + return this.parseGH(msg); + case 0x63: //c + msg.name = 'copyDone'; + return msg; + + case 0x64: //d + msg.name = 'copyData'; + return this.parsed(msg); + default: throw new Error("Unrecognized message code " + id); } @@ -505,7 +524,20 @@ p.parseA = function(msg) { msg.payload = this.parseCString(); return msg; }; - +p.parseGH = function (msg) { + msg.binary = Boolean(this.parseInt8()); + var columnCount = this.parseInt16(); + msg.columnTypes = []; + for(var i = 0; i #include +#include #include #include #include @@ -65,7 +66,6 @@ public: payload_symbol = NODE_PSYMBOL("payload"); command_symbol = NODE_PSYMBOL("command"); - NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); NODE_SET_PROTOTYPE_METHOD(t, "_sendQuery", SendQuery); NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams); @@ -73,7 +73,8 @@ public: NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared); NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel); NODE_SET_PROTOTYPE_METHOD(t, "end", End); - + NODE_SET_PROTOTYPE_METHOD(t, "_sendCopyFromChunk", SendCopyFromChunk); + NODE_SET_PROTOTYPE_METHOD(t, "_endCopyFrom", EndCopyFrom); target->Set(String::NewSymbol("Connection"), t->GetFunction()); TRACE("created class"); } @@ -246,12 +247,13 @@ public: PGconn *connection_; bool connecting_; bool ioInitialized_; + bool copyOutMode_; Connection () : ObjectWrap () { connection_ = NULL; connecting_ = false; ioInitialized_ = false; - + copyOutMode_ = false; TRACE("Initializing ev watchers"); read_watcher_.data = this; write_watcher_.data = this; @@ -261,6 +263,26 @@ public: { } + static Handle + SendCopyFromChunk(const Arguments& args) { + HandleScope scope; + Connection *self = ObjectWrap::Unwrap(args.This()); + //TODO handle errors in some way + if (args.Length() < 1 && !Buffer::HasInstance(args[0])) { + THROW("SendCopyFromChunk requires 1 Buffer argument"); + } + self->SendCopyFromChunk(args[0]->ToObject()); + return Undefined(); + } + static Handle + EndCopyFrom(const Arguments& args) { + HandleScope scope; + Connection *self = ObjectWrap::Unwrap(args.This()); + //TODO handle errors in some way + self->EndCopyFrom(); + return Undefined(); + } + protected: //v8 entry point to constructor static Handle @@ -408,14 +430,27 @@ protected: //declare handlescope as this method is entered via a libuv callback //and not part of the public v8 interface HandleScope scope; - + if (this->copyOutMode_) { + this->HandleCopyOut(); + } if (PQisBusy(connection_) == 0) { PGresult *result; bool didHandleResult = false; while ((result = PQgetResult(connection_))) { - HandleResult(result); - didHandleResult = true; - PQclear(result); + if (PGRES_COPY_IN == PQresultStatus(result)) { + didHandleResult = false; + Emit("_copyInResponse"); + PQclear(result); + break; + } else if (PGRES_COPY_OUT == PQresultStatus(result)) { + PQclear(result); + this->copyOutMode_ = true; + didHandleResult = this->HandleCopyOut(); + } else { + HandleResult(result); + didHandleResult = true; + PQclear(result); + } } //might have fired from notification if(didHandleResult) { @@ -442,7 +477,37 @@ protected: } } } - + bool HandleCopyOut () { + char * buffer = NULL; + int copied = PQgetCopyData(connection_, &buffer, 1); + if (copied > 0) { + Buffer * chunk = Buffer::New(buffer, copied); + Handle node_chunk = chunk->handle_; + Emit("_copyData", &node_chunk); + PQfreemem(buffer); + //result was not handled copmpletely + return false; + } else if (copied == 0) { + //wait for next read ready + //result was not handled copmpletely + return false; + } else if (copied == -1) { + PGresult *result; + //result is handled completely + this->copyOutMode_ = false; + if (PQisBusy(connection_) == 0 && (result = PQgetResult(connection_))) { + HandleResult(result); + PQclear(result); + return true; + } else { + return false; + } + } else if (copied == -2) { + //TODO error handling + //result is handled with error + return true; + } + } void HandleResult(PGresult* result) { ExecStatusType status = PQresultStatus(result); @@ -703,6 +768,13 @@ private: strcpy(cString, *utf8String); return cString; } + void SendCopyFromChunk(Handle chunk) { + PQputCopyData(connection_, Buffer::Data(chunk), Buffer::Length(chunk)); + } + void EndCopyFrom() { + PQputCopyEnd(connection_, NULL); + } + };