diff --git a/lib/libpq.js b/lib/libpq.js deleted file mode 100644 index ebc3ca27..00000000 --- a/lib/libpq.js +++ /dev/null @@ -1,2 +0,0 @@ -var binding = require(__dirname + '/../build/default/binding'); -module.exports = binding; diff --git a/lib/binding.js b/lib/native.js similarity index 67% rename from lib/binding.js rename to lib/native.js index e6b6f458..16506e26 100644 --- a/lib/binding.js +++ b/lib/native.js @@ -1,4 +1,7 @@ //require the c++ bindings & export to javascript +var sys = require('sys'); +var EventEmitter = require('events').EventEmitter; + var binding = require(__dirname + '/../build/default/binding'); var utils = require(__dirname + "/utils"); var Connection = binding.Connection; @@ -21,8 +24,8 @@ var getLibpgConString = function(config, callback) { params.push("dbname='" + config.database + "'"); } if(config.host) { - if(config.host != 'localhost') { - throw new Exception("Need to use node to do async DNS on host"); + if(config.host != 'localhost' && config.host != '127.0.0.1') { + throw new Error("Need to use node to do async DNS on host"); } params.push("hostaddr=127.0.0.1 "); } @@ -44,9 +47,10 @@ p.connect = function() { } p.query = function(queryString) { - this._queryQueue.push(queryString); + var q = new NativeQuery(queryString); + this._queryQueue.push(q); this._pulseQueryQueue(); - return this; + return q; } p._pulseQueryQueue = function() { @@ -61,11 +65,11 @@ p._pulseQueryQueue = function() { this.emit('drain'); return; } - this._sendQuery(query); + this._activeQuery = query; + this._sendQuery(query.text); } var ctor = function(config) { - console.log('creating native client'); var connection = new Connection(); connection._queryQueue = []; connection._activeQuery = null; @@ -74,9 +78,21 @@ var ctor = function(config) { connection._connected = true; connection._pulseQueryQueue(); }); - connection.on('readyForQuery', function() { - this.emit('end'); - this._activeQuery = null; + + //proxy some events to active query + connection.on('_row', function(row) { + connection._activeQuery.emit('row', row); + }) + connection.on('_error', function(err) { + if(connection._activeQuery) { + connection._activeQuery.emit('error', err); + } else { + connection.emit('error', err); + } + }) + connection.on('_readyForQuery', function() { + connection._activeQuery.emit('end'); + connection._activeQuery = null; connection._pulseQueryQueue(); }); return connection; @@ -90,6 +106,16 @@ var connect = function(config, callback) { }) }; +//event emitter proxy +var NativeQuery = function(text) { + this.text = text; + EventEmitter.call(this); +}; + +sys.inherits(NativeQuery, EventEmitter); +var p = NativeQuery.prototype; + + module.exports = { Client: ctor, connect: connect diff --git a/src/binding.cc b/src/binding.cc index 2eeb511e..390b0ac3 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -7,6 +7,7 @@ #define LOG(msg) printf("%s\n",msg) #define TRACE(msg) //printf("%s\n", msg); + #define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); using namespace v8; @@ -33,9 +34,9 @@ public: t->SetClassName(String::NewSymbol("Connection")); connect_symbol = NODE_PSYMBOL("connect"); - error_symbol = NODE_PSYMBOL("error"); - ready_symbol = NODE_PSYMBOL("readyForQuery"); - row_symbol = NODE_PSYMBOL("row"); + error_symbol = NODE_PSYMBOL("_error"); + ready_symbol = NODE_PSYMBOL("_readyForQuery"); + row_symbol = NODE_PSYMBOL("_row"); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); NODE_SET_PROTOTYPE_METHOD(t, "_sendQuery", SendQuery); @@ -156,7 +157,7 @@ protected: if (!connection_) { LOG("Connection couldn't be created"); } else { - TRACE("Connect created"); + TRACE("Native connection created"); } if (PQsetnonblocking(connection_, 1) == -1) { @@ -205,6 +206,7 @@ protected: } if(connecting_) { + TRACE("Processing connecting_ io"); HandleConnectionIO(); return; } @@ -219,20 +221,7 @@ protected: if (PQisBusy(connection_) == 0) { PGresult *result; while ((result = PQgetResult(connection_))) { - int rowCount = PQntuples(result); - for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) { - //create result object for this row - Local row = Object::New(); - int fieldCount = PQnfields(result); - for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) { - char* fieldName = PQfname(result, fieldNumber); - row->Set(String::New(fieldName), WrapFieldValue(result, rowNumber, fieldNumber)); - } - - //not sure about what to dealloc or scope#Close here - Handle e = (Handle)row; - Emit(row_symbol, 1, &e); - } + HandleResult(result); PQclear(result); } Emit(ready_symbol, 0, NULL); @@ -257,6 +246,40 @@ protected: } } + void HandleResult(PGresult* result) + { + ExecStatusType status = PQresultStatus(result); + switch(status) { + case PGRES_TUPLES_OK: + HandleTuplesResult(result); + break; + case PGRES_FATAL_ERROR: + EmitLastError(); + break; + default: + printf("Unrecogized query status: %s\n", PQresStatus(status)); + break; + } + } + + void HandleTuplesResult(PGresult* result) + { + int rowCount = PQntuples(result); + for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) { + //create result object for this row + Local row = Object::New(); + int fieldCount = PQnfields(result); + for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) { + char* fieldName = PQfname(result, fieldNumber); + row->Set(String::New(fieldName), WrapFieldValue(result, rowNumber, fieldNumber)); + } + + //not sure about what to dealloc or scope#Close here + Handle e = (Handle)row; + Emit(row_symbol, 1, &e); + } + } + Handle WrapFieldValue(PGresult* result, int rowNumber, int fieldNumber) { int fieldType = PQftype(result, fieldNumber); @@ -296,7 +319,6 @@ private: StopWrite(); TRACE("Polled: PGRES_POLLING_FAILED"); EmitLastError(); - EmitError("Something happened...polling error"); break; case PGRES_POLLING_OK: TRACE("Polled: PGRES_POLLING_OK"); diff --git a/test/libpq/connection-tests.js b/test/libpq/connection-tests.js deleted file mode 100644 index 8c718565..00000000 --- a/test/libpq/connection-tests.js +++ /dev/null @@ -1,35 +0,0 @@ -var helper = require(__dirname + "/../test-helper"); -var Connection = require(__dirname + "/../../lib/libpq").Connection; - -test('calling connect without params raises error', function() { - var con = new Connection(); - var err; - try{ - con.connect(); - } catch (e) { - err = e; - } - assert.ok(err!=null); -}); - -test('connecting with wrong parameters', function() { - var con = new Connection(); - con.connect("user=asldfkj hostaddr=127.0.0.1 port=5432 dbname=asldkfj"); - assert.emits(con, 'error', function(error) { - assert.ok(error != null, "error should not be null"); - con.end(); - }); -}); - - -test('connects', function() { - var con = new Connection(); - con.connect("user=brian hostaddr=127.0.0.1 port=5432 dbname=postgres"); - assert.emits(con, 'connect', function() { - test('disconnects', function() { - assert.emits(con, 'connect', function() { - con.end(); - }) - }) - }) -}) diff --git a/test/native/connection-tests.js b/test/native/connection-tests.js new file mode 100644 index 00000000..1c9bc851 --- /dev/null +++ b/test/native/connection-tests.js @@ -0,0 +1,21 @@ +var helper = require(__dirname + "/../test-helper"); +var Client = require(__dirname + "/../../lib/native").Client; + +test('connecting with wrong parameters', function() { + var con = new Client("user=asldfkj hostaddr=127.0.0.1 port=5432 dbname=asldkfj"); + con.connect(); + assert.emits(con, 'error', function(error) { + assert.ok(error != null, "error should not be null"); + con.end(); + }); +}); + +test('connects', function() { + var con = new Client("tcp://postgres:1234@127.0.0.1:5432/postgres"); + con.connect(); + assert.emits(con, 'connect', function() { + test('disconnects', function() { + con.end(); + }) + }) +}) diff --git a/test/native/evented-api-tests.js b/test/native/evented-api-tests.js new file mode 100644 index 00000000..e4d871f0 --- /dev/null +++ b/test/native/evented-api-tests.js @@ -0,0 +1,30 @@ +var helper = require(__dirname + "/../test-helper"); +var Client = require(__dirname + "/../../lib/native").Client; + +test('connects', function() { + var client = new Client("tcp://postgres:1234@127.0.0.1:5432/postgres"); + client.connect(); + test('good query', function() { + var query = client.query("SELECT 1 as num, 'HELLO' as str"); + assert.emits(query, 'row', function(row) { + test('has integer data type', function() { + assert.strictEqual(row.num, 1); + }) + test('has string data type', function() { + assert.strictEqual(row.str, "HELLO") + }) + test('emits end AFTER row event', function() { + assert.emits(query, 'end'); + test('error query', function() { + var query = client.query("LSKDJF"); + assert.emits(query, 'error', function(err) { + assert.ok(err != null, "Should not have emitted null error"); + client.end(); + }) + }) + }) + }) + }) + +}) + diff --git a/wscript b/wscript index 5e2c8262..74c25244 100644 --- a/wscript +++ b/wscript @@ -28,4 +28,5 @@ def build(bld): obj.uselib = "PG" def test(test): - Utils.exec_command("node test/libpq/connection-tests.js") + Utils.exec_command("node test/native/connection-tests.js") + Utils.exec_command("node test/native/evented-api-tests.js")