diff --git a/README.md b/README.md index cc27d747..7c77ecab 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,8 @@ Many thanks to the following: * [homme](https://github.com/homme) * [bdunavant](https://github.com/bdunavant) * [tokumine](https://github.com/tokumine) +* [shtylman](https://github.com/shtylman) +* [cricri](https://github.com/cricri) ## Documentation diff --git a/lib/client.js b/lib/client.js index 64b1b59c..891e1812 100644 --- a/lib/client.js +++ b/lib/client.js @@ -21,6 +21,8 @@ var Client = function(config) { this.queryQueue = []; this.password = config.password || defaults.password; this.encoding = 'utf8'; + this.processID = null; + this.secretKey = null; var self = this; }; @@ -59,6 +61,11 @@ p.connect = function(callback) { con.password(md5password); }); + con.once('backendKeyData', function(msg) { + self.processID = msg.processID; + self.secretKey = msg.secretKey; + }); + //hook up query handling events to connection //after the connection initially becomes ready for queries con.once('readyForQuery', function() { @@ -130,6 +137,25 @@ p.connect = function(callback) { }; +p.cancel = function(client, query) { + if (client.activeQuery == query) { + var con = this.connection; + + if(this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port); + } else { + con.connect(this.port, this.host); + } + + //once connection is established send cancel message + con.on('connect', function() { + con.cancel(client.processID, client.secretKey); + }); + } + else if (client.queryQueue.indexOf(query) != -1) + client.queryQueue.splice(client.queryQueue.indexOf(query), 1); +}; + p._pulseQueryQueue = function() { if(this.readyForQuery===true) { this.activeQuery = this.queryQueue.shift(); diff --git a/lib/connection.js b/lib/connection.js index 402615a5..6d9f11fb 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -73,6 +73,23 @@ p.startup = function(config) { this.stream.write(buffer); }; +p.cancel = function(processID, secretKey) { + var bodyBuffer = this.writer + .addInt16(1234) + .addInt16(5678) + .addInt32(processID) + .addInt32(secretKey) + .addCString('').flush(); + + var length = bodyBuffer.length + 4; + + var buffer = new Writer() + .addInt32(length) + .add(bodyBuffer) + .join(); + this.stream.write(buffer); +}; + p.password = function(password) { //0x70 = 'p' this._send(0x70, this.writer.addCString(password)); diff --git a/lib/index.js b/lib/index.js index a454bcbd..d96d44c1 100644 --- a/lib/index.js +++ b/lib/index.js @@ -81,6 +81,16 @@ PG.prototype.connect = function(config, callback) { return pool.acquire(cb); } +// cancel the query runned by the given client +PG.prototype.cancel = function(config, client, query) { + var c = config; + //allow for no config to be passed + if(typeof c === 'function') + c = defaults; + var cancellingClient = new this.Client(c); + cancellingClient.cancel(client, query); +} + module.exports = new PG(Client); //lazy require native module...the native module may not have installed diff --git a/lib/native/index.js b/lib/native/index.js index 02d99df9..ae73f841 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -56,6 +56,15 @@ p.query = function(config, values, callback) { return q; } +var nativeCancel = p.cancel; + +p.cancel = function(client, query) { + if (client._activeQuery == query) + this.connect(nativeCancel.bind(client)); + else if (client._queryQueue.indexOf(query) != -1) + client._queryQueue.splice(client._queryQueue.indexOf(query), 1); +}; + p._pulseQueryQueue = function(initialConnection) { if(!this._connected) { return; @@ -94,8 +103,8 @@ p.pauseDrain = function() { }; p.resumeDrain = function() { - if(this._drainPaused > 1) { - this.emit('drain') + if(this._drainPaused > 1) { + this.emit('drain') }; this._drainPaused = 0; }; diff --git a/lib/writer.js b/lib/writer.js index fe539ef0..49aed26d 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -3,7 +3,7 @@ //same buffer to avoid memcpy and limit memory allocations var Writer = function(size) { this.size = size || 1024; - this.buffer = new Buffer(this.size + 5); + this.buffer = Buffer(this.size + 5); this.offset = 5; this.headerPosition = 0; }; @@ -15,7 +15,7 @@ p._ensure = function(size) { var remaining = this.buffer.length - this.offset; if(remaining < size) { var oldBuffer = this.buffer; - this.buffer = Buffer(oldBuffer.length + size); + this.buffer = new Buffer(oldBuffer.length + size); oldBuffer.copy(this.buffer); } } @@ -36,24 +36,36 @@ p.addInt16 = function(num) { return this; } +//for versions of node requiring 'length' as 3rd argument to buffer.write +var writeString = function(buffer, string, offset, len) { + buffer.write(string, offset, len); +} + +//overwrite function for older versions of node +if(Buffer.prototype.write.length === 3) { + writeString = function(buffer, string, offset, len) { + buffer.write(string, offset); + } +} + p.addCString = function(string) { //just write a 0 for empty or null strings if(!string) { this._ensure(1); - this.buffer[this.offset++] = 0; - return this; + } else { + var len = Buffer.byteLength(string); + this._ensure(len + 1); //+1 for null terminator + writeString(this.buffer, string, this.offset, len); + this.offset += len; } - var len = Buffer.byteLength(string) + 1; - this._ensure(len); - this.buffer.write(string, this.offset); - this.offset += len; - this.buffer[this.offset] = 0; //add null terminator + + this.buffer[this.offset++] = 0; // null terminator return this; } p.addChar = function(char) { this._ensure(1); - this.buffer.write(char, this.offset); + writeString(this.buffer, char, this.offset, 1); this.offset++; return this; } diff --git a/package.json b/package.json index 076ddcf3..aa8846f6 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { "name": "pg", - "version": "0.6.3", + "version": "0.6.6", "description": "PostgreSQL client - pure javascript & libpq with the same API", "keywords" : ["postgres", "pg", "libpq", "postgre", "database", "rdbms"], "homepage": "http://github.com/brianc/node-postgres", diff --git a/script/create-test-tables.js b/script/create-test-tables.js index 98307d39..d8cbc4c6 100644 --- a/script/create-test-tables.js +++ b/script/create-test-tables.js @@ -1,4 +1,3 @@ -var sys = require('utils'); var args = require(__dirname + '/../test/cli'); var pg = require(__dirname + '/../lib'); diff --git a/src/binding.cc b/src/binding.cc index 5a8815d9..8518c561 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -69,6 +69,7 @@ public: NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams); NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare); NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared); + NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel); NODE_SET_PROTOTYPE_METHOD(t, "end", End); target->Set(String::NewSymbol("Connection"), t->GetFunction()); @@ -104,6 +105,22 @@ public: return Undefined(); } + //v8 entry point into Connection#cancel + static Handle + Cancel(const Arguments& args) + { + HandleScope scope; + Connection *self = ObjectWrap::Unwrap(args.This()); + + bool success = self->Cancel(); + if(!success) { + self -> EmitLastError(); + self -> DestroyConnection(); + } + + return Undefined(); + } + //v8 entry point into Connection#_sendQuery static Handle SendQuery(const Arguments& args) @@ -267,6 +284,15 @@ protected: return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0); } + int Cancel() + { + PGcancel* pgCancel = PQgetCancel(connection_); + char errbuf[256]; + int result = PQcancel(pgCancel, errbuf, 256); + PQfreeCancel(pgCancel); + return result; + } + //flushes socket void Flush() { diff --git a/test/integration/client/cancel-query-tests.js b/test/integration/client/cancel-query-tests.js new file mode 100644 index 00000000..36d5710b --- /dev/null +++ b/test/integration/client/cancel-query-tests.js @@ -0,0 +1,46 @@ +var helper = require(__dirname+"/test-helper"); + +//before running this test make sure you run the script create-test-tables +test("cancellation of a query", function() { + + var client = helper.client(); + + var qry = client.query("select name from person order by name"); + + client.on('drain', client.end.bind(client)); + + var rows1 = 0, rows2 = 0, rows3 = 0, rows4 = 0; + + var query1 = client.query(qry); + query1.on('row', function(row) { + rows1++; + }); + var query2 = client.query(qry); + query2.on('row', function(row) { + rows2++; + }); + var query3 = client.query(qry); + query3.on('row', function(row) { + rows3++; + }); + var query4 = client.query(qry); + query4.on('row', function(row) { + rows4++; + }); + + helper.pg.cancel(helper.connectionString, client, query1); + helper.pg.cancel(helper.connectionString, client, query2); + helper.pg.cancel(helper.connectionString, client, query4); + + setTimeout(function() { + assert.equal(rows1, 0); + assert.equal(rows2, 0); + assert.equal(rows4, 0); + }, 2000); + + assert.emits(query3, 'end', function() { + test("returned right number of rows", function() { + assert.equal(rows3, 26); + }); + }); +});