diff --git a/lib/index.js b/lib/index.js index 6f7377f8..953bf4ae 100644 --- a/lib/index.js +++ b/lib/index.js @@ -45,6 +45,7 @@ Client.prototype.connect = function() { fullBuffer.write(data,8); this.stream.write(fullBuffer); }); + this.stream.on('data', function(data) { var parser = new Parser(data); var result = parser.parse(); @@ -54,6 +55,9 @@ Client.prototype.connect = function() { }); }); + this.on('ReadyForQuery', function() { + self.pulseQueryQueue(); + }); }; Client.prototype.disconnect = function() { @@ -61,13 +65,21 @@ Client.prototype.disconnect = function() { this.stream.write(terminationBuffer); }; -Client.prototype.query = function() { +Client.prototype.query = function(text) { var query = new Query(); query.client = this; - this.queryQueue.push(this); + query.text = text; + this.queryQueue.push(query); return query; }; +Client.prototype.pulseQueryQueue = function() { + var query = this.queryQueue.shift(); + if(query) { + this.stream.write(query.toBuffer()); + } +}; + var Query = function() { EventEmitter.call(this); var self = this; @@ -75,8 +87,21 @@ var Query = function() { self.emit('end'); }); }; + sys.inherits(Query, EventEmitter); +Query.prototype.toBuffer = function() { + var textBuffer = new Buffer(this.text,'utf8'); + var len = textBuffer.length; + var fullBuffer = new Buffer(textBuffer.length + 5); + fullBuffer[0] = 0x51; + fullBuffer[1] = len >>> 24; + fullBuffer[2] = len >>> 16; + fullBuffer[3] = len >>> 8; + fullBuffer[4] = len >>> 0; + textBuffer.copy(fullBuffer,5,0); + return fullBuffer; +}; var Parser = function(buffer) { this.offset = 0; diff --git a/test/communication-tests.js b/test/communication-tests.js index f4e09bf1..222c7531 100644 --- a/test/communication-tests.js +++ b/test/communication-tests.js @@ -1,4 +1,5 @@ require(__dirname+'/test-helper'); + var buffers = require(__dirname+'/test-buffers'); test('client can take existing stream', function() { @@ -47,9 +48,16 @@ test('using opened stream', function() { }); test('query queue', function() { + var stream = new MemoryStream(); + stream.readyState = 'open'; - var client = new Client({stream: stream}); + + var client = new Client({ + stream: stream + }); + client.connect(); + test('new client has empty queue', function() { assert.empty(client.queryQueue); }); @@ -59,8 +67,11 @@ test('query queue', function() { assert.length(client.queryQueue, 1); }); - assert.empty(stream.packets); + test('sends query after stream emits ready for query packet', function() { + assert.empty(stream.packets); + var handled = stream.emit('data', buffers.readyForQuery()); + assert.ok(handled, "Stream should have had data handled"); + assert.length(stream.packets, 1); + }); - stream.emit('data', buffers.readyForQuery()); - assert.length(stream.packets, 1); }); diff --git a/test/test-helper.js b/test/test-helper.js index 9c7559ca..2e9c13db 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -110,3 +110,7 @@ MemoryStream = function() { sys.inherits(MemoryStream, EventEmitter); var p = MemoryStream.prototype; + +p.write = function(packet) { + this.packets.push(packet); +};