passes initial query queue tests

This commit is contained in:
brianc 2010-10-07 20:00:49 -05:00
parent 6abac08c6e
commit 3925f6fbb0
3 changed files with 46 additions and 6 deletions

View File

@ -45,6 +45,7 @@ Client.prototype.connect = function() {
fullBuffer.write(data,8);
this.stream.write(fullBuffer);
});
this.stream.on('data', function(data) {
var parser = new Parser(data);
var result = parser.parse();
@ -54,6 +55,9 @@ Client.prototype.connect = function() {
});
});
this.on('ReadyForQuery', function() {
self.pulseQueryQueue();
});
};
Client.prototype.disconnect = function() {
@ -61,13 +65,21 @@ Client.prototype.disconnect = function() {
this.stream.write(terminationBuffer);
};
Client.prototype.query = function() {
Client.prototype.query = function(text) {
var query = new Query();
query.client = this;
this.queryQueue.push(this);
query.text = text;
this.queryQueue.push(query);
return query;
};
Client.prototype.pulseQueryQueue = function() {
var query = this.queryQueue.shift();
if(query) {
this.stream.write(query.toBuffer());
}
};
var Query = function() {
EventEmitter.call(this);
var self = this;
@ -75,8 +87,21 @@ var Query = function() {
self.emit('end');
});
};
sys.inherits(Query, EventEmitter);
Query.prototype.toBuffer = function() {
var textBuffer = new Buffer(this.text,'utf8');
var len = textBuffer.length;
var fullBuffer = new Buffer(textBuffer.length + 5);
fullBuffer[0] = 0x51;
fullBuffer[1] = len >>> 24;
fullBuffer[2] = len >>> 16;
fullBuffer[3] = len >>> 8;
fullBuffer[4] = len >>> 0;
textBuffer.copy(fullBuffer,5,0);
return fullBuffer;
};
var Parser = function(buffer) {
this.offset = 0;

View File

@ -1,4 +1,5 @@
require(__dirname+'/test-helper');
var buffers = require(__dirname+'/test-buffers');
test('client can take existing stream', function() {
@ -47,9 +48,16 @@ test('using opened stream', function() {
});
test('query queue', function() {
var stream = new MemoryStream();
stream.readyState = 'open';
var client = new Client({stream: stream});
var client = new Client({
stream: stream
});
client.connect();
test('new client has empty queue', function() {
assert.empty(client.queryQueue);
});
@ -59,8 +67,11 @@ test('query queue', function() {
assert.length(client.queryQueue, 1);
});
assert.empty(stream.packets);
test('sends query after stream emits ready for query packet', function() {
assert.empty(stream.packets);
var handled = stream.emit('data', buffers.readyForQuery());
assert.ok(handled, "Stream should have had data handled");
assert.length(stream.packets, 1);
});
stream.emit('data', buffers.readyForQuery());
assert.length(stream.packets, 1);
});

View File

@ -110,3 +110,7 @@ MemoryStream = function() {
sys.inherits(MemoryStream, EventEmitter);
var p = MemoryStream.prototype;
p.write = function(packet) {
this.packets.push(packet);
};