diff --git a/lib/client.js b/lib/client.js index d6f7a0bb..5759706e 100644 --- a/lib/client.js +++ b/lib/client.js @@ -7,8 +7,6 @@ var ConnectionParameters = require(__dirname + '/connection-parameters'); var Query = require(__dirname + '/query'); var defaults = require(__dirname + '/defaults'); var Connection = require(__dirname + '/connection'); -var CopyFromStream = require(__dirname + '/copystream').CopyFromStream; -var CopyToStream = require(__dirname + '/copystream').CopyToStream; var Client = function(config) { EventEmitter.call(this); @@ -121,17 +119,6 @@ Client.prototype.connect = function(callback) { self.activeQuery.handleCopyInResponse(self.connection); }); - con.on('copyOutResponse', function(msg) { - if(self.activeQuery.stream === undefined) { - self.activeQuery._canceledDueToError = new Error('No destination stream defined'); - //canceling query requires creation of new connection - //look for postgres frontend/backend protocol - //TODO - this needs to die/be refactored - (new self.constructor({port: self.port, host: self.host})) - .cancel(self, self.activeQuery); - } - }); - con.on('copyData', function (msg) { self.activeQuery.handleCopyData(msg, self.connection); }); @@ -200,9 +187,8 @@ Client.prototype.getStartupConf = function() { var params = this.connectionParameters; var data = { - user : params.user , - database : params.database - // client_encoding : "'".concat(params.client_encoding).concat("'") + user: params.user, + database: params.database }; var appName = params.application_name || params.fallback_application_name; @@ -292,30 +278,12 @@ Client.prototype._pulseQueryQueue = function() { } }; -Client.prototype._copy = function (text, stream) { - var config = {}; - config.text = text; - config.stream = stream; - config.callback = function (error) { - if(error) { - config.stream.error(error); - } else { - config.stream.close(); - } - }; - var query = new Query(config); - this.queryQueue.push(query); - this._pulseQueryQueue(); - return config.stream; - -}; - Client.prototype.copyFrom = function (text) { - return this._copy(text, new CopyFromStream()); + throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams"); }; Client.prototype.copyTo = function (text) { - return this._copy(text, new CopyToStream()); + throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams"); }; Client.prototype.query = function(config, values, callback) { diff --git a/lib/copystream.js b/lib/copystream.js deleted file mode 100644 index bcf678c2..00000000 --- a/lib/copystream.js +++ /dev/null @@ -1,206 +0,0 @@ -var Stream = require('stream').Stream; -var util = require('util'); -var CopyFromStream = function () { - Stream.apply(this, arguments); - this._buffer = new Buffer(0); - this._connection = false; - this._finished = false; - this._finishedSent = false; - this._closed = false; - this._error = false; - this._dataBuffered = false; - this.__defineGetter__("writable", this._writable.bind(this)); -}; - -util.inherits(CopyFromStream, Stream); - -CopyFromStream.prototype._writable = function () { - return !(this._finished || this._error); -}; - -CopyFromStream.prototype.startStreamingToConnection = function (connection) { - if(this._error) { - return; - } - this._connection = connection; - this._sendIfConnectionReady(); - this._endIfNeedAndPossible(); -}; - -CopyFromStream.prototype._handleChunk = function (string, encoding) { - var dataChunk, - tmpBuffer; - if(string !== undefined) { - if(string instanceof Buffer) { - dataChunk = string; - } else { - dataChunk = new Buffer(string, encoding); - } - if(this._buffer.length) { - //Buffer.concat is better, but it's missing - //in node v0.6.x - tmpBuffer = new Buffer(this._buffer.length + dataChunk.length); - this._buffer.copy(tmpBuffer); - dataChunk.copy(tmpBuffer, this._buffer.length); - this._buffer = tmpBuffer; - } else { - this._buffer = dataChunk; - } - } - - return this._sendIfConnectionReady(); -}; - -CopyFromStream.prototype._sendIfConnectionReady = function () { - var dataSent = false; - if(this._connection) { - dataSent = this._connection.sendCopyFromChunk(this._buffer); - this._buffer = new Buffer(0); - if(this._dataBuffered) { - this.emit('drain'); - } - this._dataBuffered = false; - } else { - this._dataBuffered = true; - } - return dataSent; -}; - -CopyFromStream.prototype._endIfNeedAndPossible = function () { - if(this._connection && this._finished && !this._finishedSent) { - this._finishedSent = true; - this._connection.endCopyFrom(); - } -}; - -CopyFromStream.prototype.write = function (string, encoding) { - if(this._error || this._finished) { - return false; - } - return this._handleChunk.apply(this, arguments); -}; - -CopyFromStream.prototype.end = function (string, encoding) { - if(this._error || this._finished) { - return false; - } - this._finished = true; - if(string !== undefined) { - this._handleChunk.apply(this, arguments); - } - this._endIfNeedAndPossible(); -}; - -CopyFromStream.prototype.error = function (error) { - if(this._error || this._closed) { - return false; - } - this._error = true; - this.emit('error', error); -}; - -CopyFromStream.prototype.close = function () { - if(this._error || this._closed) { - return false; - } - if(!this._finishedSent) { - throw new Error("seems to be error in code that uses CopyFromStream"); - } - this.emit("close"); -}; - -var CopyToStream = function () { - Stream.apply(this, arguments); - this._error = false; - this._finished = false; - this._paused = false; - this.buffer = new Buffer(0); - this._encoding = undefined; - this.__defineGetter__('readable', this._readable.bind(this)); -}; - -util.inherits(CopyToStream, Stream); - -CopyToStream.prototype._outputDataChunk = function () { - if(this._paused) { - return; - } - if(this.buffer.length) { - if(this._encoding) { - this.emit('data', this.buffer.toString(this._encoding)); - } else { - this.emit('data', this.buffer); - } - this.buffer = new Buffer(0); - } -}; - -CopyToStream.prototype._readable = function () { - return !this._finished && !this._error; -}; - -CopyToStream.prototype.error = function (error) { - if(!this.readable) { - return false; - } - this._error = error; - if(!this._paused) { - this.emit('error', error); - } -}; - -CopyToStream.prototype.close = function () { - if(!this.readable) { - return false; - } - this._finished = true; - if(!this._paused) { - this.emit("end"); - } -}; - -CopyToStream.prototype.handleChunk = function (chunk) { - var tmpBuffer; - if(!this.readable) { - return; - } - if(!this.buffer.length) { - this.buffer = chunk; - } else { - tmpBuffer = new Buffer(this.buffer.length + chunk.length); - this.buffer.copy(tmpBuffer); - chunk.copy(tmpBuffer, this.buffer.length); - this.buffer = tmpBuffer; - } - this._outputDataChunk(); -}; - -CopyToStream.prototype.pause = function () { - if(!this.readable) { - return false; - } - this._paused = true; -}; - -CopyToStream.prototype.resume = function () { - if(!this._paused) { - return false; - } - this._paused = false; - this._outputDataChunk(); - if(this._error) { - return this.emit('error', this._error); - } - if(this._finished) { - return this.emit('end'); - } -}; - -CopyToStream.prototype.setEncoding = function (encoding) { - this._encoding = encoding; -}; - -module.exports = { - CopyFromStream: CopyFromStream, - CopyToStream: CopyToStream -}; diff --git a/test/unit/copystream/copyfrom-tests.js b/test/unit/copystream/copyfrom-tests.js deleted file mode 100644 index 7b96049e..00000000 --- a/test/unit/copystream/copyfrom-tests.js +++ /dev/null @@ -1,99 +0,0 @@ -var helper = require(__dirname + '/../test-helper'); -var CopyFromStream = require(__dirname + '/../../../lib/copystream').CopyFromStream; -var ConnectionImitation = function () { - this.send = 0; - this.hasToBeSend = 0; - this.finished = 0; -}; -ConnectionImitation.prototype = { - endCopyFrom: function () { - assert.ok(this.finished++ === 0, "end shoud be called only once"); - assert.equal(this.send, this.hasToBeSend, "at the moment of the end all data has to be sent"); - }, - sendCopyFromChunk: function (chunk) { - this.send += chunk.length; - return true; - }, - updateHasToBeSend: function (chunk) { - this.hasToBeSend += chunk.length; - return chunk; - } -}; -var buf1 = new Buffer("asdfasd"), - buf2 = new Buffer("q03r90arf0aospd;"), - buf3 = new Buffer(542), - buf4 = new Buffer("93jfemialfjkasjlfas"); - -test('CopyFromStream, start streaming before data, end after data. no drain event', function () { - var stream = new CopyFromStream(); - var conn = new ConnectionImitation(); - stream.on('drain', function () { - assert.ok(false, "there has not be drain event"); - }); - stream.startStreamingToConnection(conn); - assert.ok(stream.write(conn.updateHasToBeSend(buf1))); - assert.ok(stream.write(conn.updateHasToBeSend(buf2))); - assert.ok(stream.write(conn.updateHasToBeSend(buf3))); - assert.ok(stream.writable, "stream has to be writable"); - stream.end(conn.updateHasToBeSend(buf4)); - assert.ok(!stream.writable, "stream has not to be writable"); - stream.end(); - assert.equal(conn.hasToBeSend, conn.send); -}); -test('CopyFromStream, start streaming after end, end after data. drain event', function () { - var stream = new CopyFromStream(); - assert.emits(stream, 'drain', function() {}, 'drain have to be emitted'); - var conn = new ConnectionImitation() - assert.ok(!stream.write(conn.updateHasToBeSend(buf1))); - assert.ok(!stream.write(conn.updateHasToBeSend(buf2))); - assert.ok(!stream.write(conn.updateHasToBeSend(buf3))); - assert.ok(stream.writable, "stream has to be writable"); - stream.end(conn.updateHasToBeSend(buf4)); - assert.ok(!stream.writable, "stream has not to be writable"); - stream.end(); - stream.startStreamingToConnection(conn); - assert.equal(conn.hasToBeSend, conn.send); -}); -test('CopyFromStream, start streaming between data chunks. end after data. drain event', function () { - var stream = new CopyFromStream(); - var conn = new ConnectionImitation() - assert.emits(stream, 'drain', function() {}, 'drain have to be emitted'); - stream.write(conn.updateHasToBeSend(buf1)); - stream.write(conn.updateHasToBeSend(buf2)); - stream.startStreamingToConnection(conn); - stream.write(conn.updateHasToBeSend(buf3)); - assert.ok(stream.writable, "stream has to be writable"); - stream.end(conn.updateHasToBeSend(buf4)); - assert.equal(conn.hasToBeSend, conn.send); - assert.ok(!stream.writable, "stream has not to be writable"); - stream.end(); -}); -test('CopyFromStream, start sreaming before end. end stream with data. drain event', function () { - var stream = new CopyFromStream(); - var conn = new ConnectionImitation() - assert.emits(stream, 'drain', function() {}, 'drain have to be emitted'); - stream.write(conn.updateHasToBeSend(buf1)); - stream.write(conn.updateHasToBeSend(buf2)); - stream.write(conn.updateHasToBeSend(buf3)); - stream.startStreamingToConnection(conn); - assert.ok(stream.writable, "stream has to be writable"); - stream.end(conn.updateHasToBeSend(buf4)); - assert.equal(conn.hasToBeSend, conn.send); - assert.ok(!stream.writable, "stream has not to be writable"); - stream.end(); -}); -test('CopyFromStream, start streaming after end. end with data. drain event', function(){ - var stream = new CopyFromStream(); - var conn = new ConnectionImitation() - assert.emits(stream, 'drain', function() {}, 'drain have to be emitted'); - stream.write(conn.updateHasToBeSend(buf1)); - stream.write(conn.updateHasToBeSend(buf2)); - stream.write(conn.updateHasToBeSend(buf3)); - stream.startStreamingToConnection(conn); - assert.ok(stream.writable, "stream has to be writable"); - stream.end(conn.updateHasToBeSend(buf4)); - stream.startStreamingToConnection(conn); - assert.equal(conn.hasToBeSend, conn.send); - assert.ok(!stream.writable, "stream has not to be writable"); - stream.end(); -}); diff --git a/test/unit/copystream/copyto-tests.js b/test/unit/copystream/copyto-tests.js deleted file mode 100644 index 7a6255b7..00000000 --- a/test/unit/copystream/copyto-tests.js +++ /dev/null @@ -1,122 +0,0 @@ -var helper = require(__dirname + '/../test-helper'); -var CopyToStream = require(__dirname + '/../../../lib/copystream').CopyToStream; -var DataCounter = function () { - this.sendBytes = 0; - this.recievedBytes = 0; -}; -DataCounter.prototype = { - send: function (buf) { - this.sendBytes += buf.length; - return buf; - }, - recieve: function (chunk) { - this.recievedBytes += chunk.length; - }, - assert: function () { - assert.equal(this.sendBytes, this.recievedBytes, "data bytes send and recieved has to match"); - } -}; -var buf1 = new Buffer("asdfasd"), - buf2 = new Buffer("q03r90arf0aospd;"), - buf3 = new Buffer(542), - buf4 = new Buffer("93jfemialfjkasjlfas"); -test('CopyToStream simple', function () { - var stream = new CopyToStream(), - dc = new DataCounter(); - assert.emits(stream, 'end', function () {}, ''); - stream.on('data', dc.recieve.bind(dc)); - stream.handleChunk(dc.send(buf1)); - stream.handleChunk(dc.send(buf2)); - stream.handleChunk(dc.send(buf3)); - stream.handleChunk(dc.send(buf4)); - dc.assert(); - stream.close(); -}); -test('CopyToStream pause/resume/close', function () { - var stream = new CopyToStream(), - dc = new DataCounter(); - stream.on('data', dc.recieve.bind(dc)); - assert.emits(stream, 'end', function () {}, 'stream has to emit end after closing'); - stream.pause(); - stream.handleChunk(dc.send(buf1)); - stream.handleChunk(dc.send(buf2)); - stream.handleChunk(dc.send(buf3)); - assert.equal(dc.recievedBytes, 0); - stream.resume(); - dc.assert(); - stream.handleChunk(dc.send(buf2)); - dc.assert(); - stream.handleChunk(dc.send(buf3)); - dc.assert(); - stream.pause(); - stream.handleChunk(dc.send(buf4)); - assert(dc.sendBytes - dc.recievedBytes, buf4.length, "stream has not emit, data while it is in paused state"); - stream.resume(); - dc.assert(); - stream.close(); -}); -test('CopyToStream error', function () { - var stream = new CopyToStream(), - dc = new DataCounter(); - stream.on('data', dc.recieve.bind(dc)); - assert.emits(stream, 'error', function () {}, 'stream has to emit error event, when error method called'); - stream.handleChunk(dc.send(buf1)); - stream.handleChunk(dc.send(buf2)); - stream.error(new Error('test error')); -}); -test('CopyToStream do not emit anything while paused', function () { - var stream = new CopyToStream(); - stream.on('data', function () { - assert.ok(false, "stream has not emit data when paused"); - }); - stream.on('end', function () { - assert.ok(false, "stream has not emit end when paused"); - }); - stream.on('error', function () { - assert.ok(false, "stream has not emit end when paused"); - }); - stream.pause(); - stream.handleChunk(buf2); - stream.close(); - stream.error(); -}); -test('CopyToStream emit data and error after resume', function () { - var stream = new CopyToStream(), - paused; - stream.on('data', function () { - assert.ok(!paused, "stream has not emit data when paused"); - }); - stream.on('end', function () { - assert.ok(!paused, "stream has not emit end when paused"); - }); - stream.on('error', function () { - assert.ok(!paused, "stream has not emit end when paused"); - }); - paused = true; - stream.pause(); - stream.handleChunk(buf2); - stream.error(); - paused = false; - stream.resume(); -}); -test('CopyToStream emit data and end after resume', function () { - var stream = new CopyToStream(), - paused; - stream.on('data', function () { - assert.ok(!paused, "stream has not emit data when paused"); - }); - stream.on('end', function () { - assert.ok(!paused, "stream has not emit end when paused"); - }); - stream.on('error', function () { - assert.ok(!paused, "stream has not emit end when paused"); - }); - paused = true; - stream.pause(); - stream.handleChunk(buf2); - stream.close(); - paused = false; - stream.resume(); -}); - -