Remove COPY TO / COPY FROM

This commit is contained in:
Brian M. Carlson 2014-10-11 14:27:03 -04:00
parent 2de9838e78
commit 0b2344b6b5
4 changed files with 4 additions and 463 deletions

View File

@ -7,8 +7,6 @@ var ConnectionParameters = require(__dirname + '/connection-parameters');
var Query = require(__dirname + '/query');
var defaults = require(__dirname + '/defaults');
var Connection = require(__dirname + '/connection');
var CopyFromStream = require(__dirname + '/copystream').CopyFromStream;
var CopyToStream = require(__dirname + '/copystream').CopyToStream;
var Client = function(config) {
EventEmitter.call(this);
@ -121,17 +119,6 @@ Client.prototype.connect = function(callback) {
self.activeQuery.handleCopyInResponse(self.connection);
});
con.on('copyOutResponse', function(msg) {
if(self.activeQuery.stream === undefined) {
self.activeQuery._canceledDueToError = new Error('No destination stream defined');
//canceling query requires creation of new connection
//look for postgres frontend/backend protocol
//TODO - this needs to die/be refactored
(new self.constructor({port: self.port, host: self.host}))
.cancel(self, self.activeQuery);
}
});
con.on('copyData', function (msg) {
self.activeQuery.handleCopyData(msg, self.connection);
});
@ -200,9 +187,8 @@ Client.prototype.getStartupConf = function() {
var params = this.connectionParameters;
var data = {
user : params.user ,
database : params.database
// client_encoding : "'".concat(params.client_encoding).concat("'")
user: params.user,
database: params.database
};
var appName = params.application_name || params.fallback_application_name;
@ -292,30 +278,12 @@ Client.prototype._pulseQueryQueue = function() {
}
};
Client.prototype._copy = function (text, stream) {
var config = {};
config.text = text;
config.stream = stream;
config.callback = function (error) {
if(error) {
config.stream.error(error);
} else {
config.stream.close();
}
};
var query = new Query(config);
this.queryQueue.push(query);
this._pulseQueryQueue();
return config.stream;
};
Client.prototype.copyFrom = function (text) {
return this._copy(text, new CopyFromStream());
throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams");
};
Client.prototype.copyTo = function (text) {
return this._copy(text, new CopyToStream());
throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams");
};
Client.prototype.query = function(config, values, callback) {

View File

@ -1,206 +0,0 @@
var Stream = require('stream').Stream;
var util = require('util');
var CopyFromStream = function () {
Stream.apply(this, arguments);
this._buffer = new Buffer(0);
this._connection = false;
this._finished = false;
this._finishedSent = false;
this._closed = false;
this._error = false;
this._dataBuffered = false;
this.__defineGetter__("writable", this._writable.bind(this));
};
util.inherits(CopyFromStream, Stream);
CopyFromStream.prototype._writable = function () {
return !(this._finished || this._error);
};
CopyFromStream.prototype.startStreamingToConnection = function (connection) {
if(this._error) {
return;
}
this._connection = connection;
this._sendIfConnectionReady();
this._endIfNeedAndPossible();
};
CopyFromStream.prototype._handleChunk = function (string, encoding) {
var dataChunk,
tmpBuffer;
if(string !== undefined) {
if(string instanceof Buffer) {
dataChunk = string;
} else {
dataChunk = new Buffer(string, encoding);
}
if(this._buffer.length) {
//Buffer.concat is better, but it's missing
//in node v0.6.x
tmpBuffer = new Buffer(this._buffer.length + dataChunk.length);
this._buffer.copy(tmpBuffer);
dataChunk.copy(tmpBuffer, this._buffer.length);
this._buffer = tmpBuffer;
} else {
this._buffer = dataChunk;
}
}
return this._sendIfConnectionReady();
};
CopyFromStream.prototype._sendIfConnectionReady = function () {
var dataSent = false;
if(this._connection) {
dataSent = this._connection.sendCopyFromChunk(this._buffer);
this._buffer = new Buffer(0);
if(this._dataBuffered) {
this.emit('drain');
}
this._dataBuffered = false;
} else {
this._dataBuffered = true;
}
return dataSent;
};
CopyFromStream.prototype._endIfNeedAndPossible = function () {
if(this._connection && this._finished && !this._finishedSent) {
this._finishedSent = true;
this._connection.endCopyFrom();
}
};
CopyFromStream.prototype.write = function (string, encoding) {
if(this._error || this._finished) {
return false;
}
return this._handleChunk.apply(this, arguments);
};
CopyFromStream.prototype.end = function (string, encoding) {
if(this._error || this._finished) {
return false;
}
this._finished = true;
if(string !== undefined) {
this._handleChunk.apply(this, arguments);
}
this._endIfNeedAndPossible();
};
CopyFromStream.prototype.error = function (error) {
if(this._error || this._closed) {
return false;
}
this._error = true;
this.emit('error', error);
};
CopyFromStream.prototype.close = function () {
if(this._error || this._closed) {
return false;
}
if(!this._finishedSent) {
throw new Error("seems to be error in code that uses CopyFromStream");
}
this.emit("close");
};
var CopyToStream = function () {
Stream.apply(this, arguments);
this._error = false;
this._finished = false;
this._paused = false;
this.buffer = new Buffer(0);
this._encoding = undefined;
this.__defineGetter__('readable', this._readable.bind(this));
};
util.inherits(CopyToStream, Stream);
CopyToStream.prototype._outputDataChunk = function () {
if(this._paused) {
return;
}
if(this.buffer.length) {
if(this._encoding) {
this.emit('data', this.buffer.toString(this._encoding));
} else {
this.emit('data', this.buffer);
}
this.buffer = new Buffer(0);
}
};
CopyToStream.prototype._readable = function () {
return !this._finished && !this._error;
};
CopyToStream.prototype.error = function (error) {
if(!this.readable) {
return false;
}
this._error = error;
if(!this._paused) {
this.emit('error', error);
}
};
CopyToStream.prototype.close = function () {
if(!this.readable) {
return false;
}
this._finished = true;
if(!this._paused) {
this.emit("end");
}
};
CopyToStream.prototype.handleChunk = function (chunk) {
var tmpBuffer;
if(!this.readable) {
return;
}
if(!this.buffer.length) {
this.buffer = chunk;
} else {
tmpBuffer = new Buffer(this.buffer.length + chunk.length);
this.buffer.copy(tmpBuffer);
chunk.copy(tmpBuffer, this.buffer.length);
this.buffer = tmpBuffer;
}
this._outputDataChunk();
};
CopyToStream.prototype.pause = function () {
if(!this.readable) {
return false;
}
this._paused = true;
};
CopyToStream.prototype.resume = function () {
if(!this._paused) {
return false;
}
this._paused = false;
this._outputDataChunk();
if(this._error) {
return this.emit('error', this._error);
}
if(this._finished) {
return this.emit('end');
}
};
CopyToStream.prototype.setEncoding = function (encoding) {
this._encoding = encoding;
};
module.exports = {
CopyFromStream: CopyFromStream,
CopyToStream: CopyToStream
};

View File

@ -1,99 +0,0 @@
var helper = require(__dirname + '/../test-helper');
var CopyFromStream = require(__dirname + '/../../../lib/copystream').CopyFromStream;
var ConnectionImitation = function () {
this.send = 0;
this.hasToBeSend = 0;
this.finished = 0;
};
ConnectionImitation.prototype = {
endCopyFrom: function () {
assert.ok(this.finished++ === 0, "end shoud be called only once");
assert.equal(this.send, this.hasToBeSend, "at the moment of the end all data has to be sent");
},
sendCopyFromChunk: function (chunk) {
this.send += chunk.length;
return true;
},
updateHasToBeSend: function (chunk) {
this.hasToBeSend += chunk.length;
return chunk;
}
};
var buf1 = new Buffer("asdfasd"),
buf2 = new Buffer("q03r90arf0aospd;"),
buf3 = new Buffer(542),
buf4 = new Buffer("93jfemialfjkasjlfas");
test('CopyFromStream, start streaming before data, end after data. no drain event', function () {
var stream = new CopyFromStream();
var conn = new ConnectionImitation();
stream.on('drain', function () {
assert.ok(false, "there has not be drain event");
});
stream.startStreamingToConnection(conn);
assert.ok(stream.write(conn.updateHasToBeSend(buf1)));
assert.ok(stream.write(conn.updateHasToBeSend(buf2)));
assert.ok(stream.write(conn.updateHasToBeSend(buf3)));
assert.ok(stream.writable, "stream has to be writable");
stream.end(conn.updateHasToBeSend(buf4));
assert.ok(!stream.writable, "stream has not to be writable");
stream.end();
assert.equal(conn.hasToBeSend, conn.send);
});
test('CopyFromStream, start streaming after end, end after data. drain event', function () {
var stream = new CopyFromStream();
assert.emits(stream, 'drain', function() {}, 'drain have to be emitted');
var conn = new ConnectionImitation()
assert.ok(!stream.write(conn.updateHasToBeSend(buf1)));
assert.ok(!stream.write(conn.updateHasToBeSend(buf2)));
assert.ok(!stream.write(conn.updateHasToBeSend(buf3)));
assert.ok(stream.writable, "stream has to be writable");
stream.end(conn.updateHasToBeSend(buf4));
assert.ok(!stream.writable, "stream has not to be writable");
stream.end();
stream.startStreamingToConnection(conn);
assert.equal(conn.hasToBeSend, conn.send);
});
test('CopyFromStream, start streaming between data chunks. end after data. drain event', function () {
var stream = new CopyFromStream();
var conn = new ConnectionImitation()
assert.emits(stream, 'drain', function() {}, 'drain have to be emitted');
stream.write(conn.updateHasToBeSend(buf1));
stream.write(conn.updateHasToBeSend(buf2));
stream.startStreamingToConnection(conn);
stream.write(conn.updateHasToBeSend(buf3));
assert.ok(stream.writable, "stream has to be writable");
stream.end(conn.updateHasToBeSend(buf4));
assert.equal(conn.hasToBeSend, conn.send);
assert.ok(!stream.writable, "stream has not to be writable");
stream.end();
});
test('CopyFromStream, start sreaming before end. end stream with data. drain event', function () {
var stream = new CopyFromStream();
var conn = new ConnectionImitation()
assert.emits(stream, 'drain', function() {}, 'drain have to be emitted');
stream.write(conn.updateHasToBeSend(buf1));
stream.write(conn.updateHasToBeSend(buf2));
stream.write(conn.updateHasToBeSend(buf3));
stream.startStreamingToConnection(conn);
assert.ok(stream.writable, "stream has to be writable");
stream.end(conn.updateHasToBeSend(buf4));
assert.equal(conn.hasToBeSend, conn.send);
assert.ok(!stream.writable, "stream has not to be writable");
stream.end();
});
test('CopyFromStream, start streaming after end. end with data. drain event', function(){
var stream = new CopyFromStream();
var conn = new ConnectionImitation()
assert.emits(stream, 'drain', function() {}, 'drain have to be emitted');
stream.write(conn.updateHasToBeSend(buf1));
stream.write(conn.updateHasToBeSend(buf2));
stream.write(conn.updateHasToBeSend(buf3));
stream.startStreamingToConnection(conn);
assert.ok(stream.writable, "stream has to be writable");
stream.end(conn.updateHasToBeSend(buf4));
stream.startStreamingToConnection(conn);
assert.equal(conn.hasToBeSend, conn.send);
assert.ok(!stream.writable, "stream has not to be writable");
stream.end();
});

View File

@ -1,122 +0,0 @@
var helper = require(__dirname + '/../test-helper');
var CopyToStream = require(__dirname + '/../../../lib/copystream').CopyToStream;
var DataCounter = function () {
this.sendBytes = 0;
this.recievedBytes = 0;
};
DataCounter.prototype = {
send: function (buf) {
this.sendBytes += buf.length;
return buf;
},
recieve: function (chunk) {
this.recievedBytes += chunk.length;
},
assert: function () {
assert.equal(this.sendBytes, this.recievedBytes, "data bytes send and recieved has to match");
}
};
var buf1 = new Buffer("asdfasd"),
buf2 = new Buffer("q03r90arf0aospd;"),
buf3 = new Buffer(542),
buf4 = new Buffer("93jfemialfjkasjlfas");
test('CopyToStream simple', function () {
var stream = new CopyToStream(),
dc = new DataCounter();
assert.emits(stream, 'end', function () {}, '');
stream.on('data', dc.recieve.bind(dc));
stream.handleChunk(dc.send(buf1));
stream.handleChunk(dc.send(buf2));
stream.handleChunk(dc.send(buf3));
stream.handleChunk(dc.send(buf4));
dc.assert();
stream.close();
});
test('CopyToStream pause/resume/close', function () {
var stream = new CopyToStream(),
dc = new DataCounter();
stream.on('data', dc.recieve.bind(dc));
assert.emits(stream, 'end', function () {}, 'stream has to emit end after closing');
stream.pause();
stream.handleChunk(dc.send(buf1));
stream.handleChunk(dc.send(buf2));
stream.handleChunk(dc.send(buf3));
assert.equal(dc.recievedBytes, 0);
stream.resume();
dc.assert();
stream.handleChunk(dc.send(buf2));
dc.assert();
stream.handleChunk(dc.send(buf3));
dc.assert();
stream.pause();
stream.handleChunk(dc.send(buf4));
assert(dc.sendBytes - dc.recievedBytes, buf4.length, "stream has not emit, data while it is in paused state");
stream.resume();
dc.assert();
stream.close();
});
test('CopyToStream error', function () {
var stream = new CopyToStream(),
dc = new DataCounter();
stream.on('data', dc.recieve.bind(dc));
assert.emits(stream, 'error', function () {}, 'stream has to emit error event, when error method called');
stream.handleChunk(dc.send(buf1));
stream.handleChunk(dc.send(buf2));
stream.error(new Error('test error'));
});
test('CopyToStream do not emit anything while paused', function () {
var stream = new CopyToStream();
stream.on('data', function () {
assert.ok(false, "stream has not emit data when paused");
});
stream.on('end', function () {
assert.ok(false, "stream has not emit end when paused");
});
stream.on('error', function () {
assert.ok(false, "stream has not emit end when paused");
});
stream.pause();
stream.handleChunk(buf2);
stream.close();
stream.error();
});
test('CopyToStream emit data and error after resume', function () {
var stream = new CopyToStream(),
paused;
stream.on('data', function () {
assert.ok(!paused, "stream has not emit data when paused");
});
stream.on('end', function () {
assert.ok(!paused, "stream has not emit end when paused");
});
stream.on('error', function () {
assert.ok(!paused, "stream has not emit end when paused");
});
paused = true;
stream.pause();
stream.handleChunk(buf2);
stream.error();
paused = false;
stream.resume();
});
test('CopyToStream emit data and end after resume', function () {
var stream = new CopyToStream(),
paused;
stream.on('data', function () {
assert.ok(!paused, "stream has not emit data when paused");
});
stream.on('end', function () {
assert.ok(!paused, "stream has not emit end when paused");
});
stream.on('error', function () {
assert.ok(!paused, "stream has not emit end when paused");
});
paused = true;
stream.pause();
stream.handleChunk(buf2);
stream.close();
paused = false;
stream.resume();
});