cleaned up file structure & improved evented query api compat

This commit is contained in:
Brian Carlson 2011-02-23 20:02:51 -06:00
parent 6a2adc1feb
commit c321151325
7 changed files with 129 additions and 66 deletions

View File

@ -1,2 +0,0 @@
var binding = require(__dirname + '/../build/default/binding');
module.exports = binding;

View File

@ -1,4 +1,7 @@
//require the c++ bindings & export to javascript
var sys = require('sys');
var EventEmitter = require('events').EventEmitter;
var binding = require(__dirname + '/../build/default/binding');
var utils = require(__dirname + "/utils");
var Connection = binding.Connection;
@ -21,8 +24,8 @@ var getLibpgConString = function(config, callback) {
params.push("dbname='" + config.database + "'");
}
if(config.host) {
if(config.host != 'localhost') {
throw new Exception("Need to use node to do async DNS on host");
if(config.host != 'localhost' && config.host != '127.0.0.1') {
throw new Error("Need to use node to do async DNS on host");
}
params.push("hostaddr=127.0.0.1 ");
}
@ -44,9 +47,10 @@ p.connect = function() {
}
p.query = function(queryString) {
this._queryQueue.push(queryString);
var q = new NativeQuery(queryString);
this._queryQueue.push(q);
this._pulseQueryQueue();
return this;
return q;
}
p._pulseQueryQueue = function() {
@ -61,11 +65,11 @@ p._pulseQueryQueue = function() {
this.emit('drain');
return;
}
this._sendQuery(query);
this._activeQuery = query;
this._sendQuery(query.text);
}
var ctor = function(config) {
console.log('creating native client');
var connection = new Connection();
connection._queryQueue = [];
connection._activeQuery = null;
@ -74,9 +78,21 @@ var ctor = function(config) {
connection._connected = true;
connection._pulseQueryQueue();
});
connection.on('readyForQuery', function() {
this.emit('end');
this._activeQuery = null;
//proxy some events to active query
connection.on('_row', function(row) {
connection._activeQuery.emit('row', row);
})
connection.on('_error', function(err) {
if(connection._activeQuery) {
connection._activeQuery.emit('error', err);
} else {
connection.emit('error', err);
}
})
connection.on('_readyForQuery', function() {
connection._activeQuery.emit('end');
connection._activeQuery = null;
connection._pulseQueryQueue();
});
return connection;
@ -90,6 +106,16 @@ var connect = function(config, callback) {
})
};
//event emitter proxy
var NativeQuery = function(text) {
this.text = text;
EventEmitter.call(this);
};
sys.inherits(NativeQuery, EventEmitter);
var p = NativeQuery.prototype;
module.exports = {
Client: ctor,
connect: connect

View File

@ -7,6 +7,7 @@
#define LOG(msg) printf("%s\n",msg)
#define TRACE(msg) //printf("%s\n", msg);
#define THROW(msg) return ThrowException(Exception::Error(String::New(msg)));
using namespace v8;
@ -33,9 +34,9 @@ public:
t->SetClassName(String::NewSymbol("Connection"));
connect_symbol = NODE_PSYMBOL("connect");
error_symbol = NODE_PSYMBOL("error");
ready_symbol = NODE_PSYMBOL("readyForQuery");
row_symbol = NODE_PSYMBOL("row");
error_symbol = NODE_PSYMBOL("_error");
ready_symbol = NODE_PSYMBOL("_readyForQuery");
row_symbol = NODE_PSYMBOL("_row");
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(t, "_sendQuery", SendQuery);
@ -156,7 +157,7 @@ protected:
if (!connection_) {
LOG("Connection couldn't be created");
} else {
TRACE("Connect created");
TRACE("Native connection created");
}
if (PQsetnonblocking(connection_, 1) == -1) {
@ -205,6 +206,7 @@ protected:
}
if(connecting_) {
TRACE("Processing connecting_ io");
HandleConnectionIO();
return;
}
@ -219,20 +221,7 @@ protected:
if (PQisBusy(connection_) == 0) {
PGresult *result;
while ((result = PQgetResult(connection_))) {
int rowCount = PQntuples(result);
for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) {
//create result object for this row
Local<Object> row = Object::New();
int fieldCount = PQnfields(result);
for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) {
char* fieldName = PQfname(result, fieldNumber);
row->Set(String::New(fieldName), WrapFieldValue(result, rowNumber, fieldNumber));
}
//not sure about what to dealloc or scope#Close here
Handle<Value> e = (Handle<Value>)row;
Emit(row_symbol, 1, &e);
}
HandleResult(result);
PQclear(result);
}
Emit(ready_symbol, 0, NULL);
@ -257,6 +246,40 @@ protected:
}
}
void HandleResult(PGresult* result)
{
ExecStatusType status = PQresultStatus(result);
switch(status) {
case PGRES_TUPLES_OK:
HandleTuplesResult(result);
break;
case PGRES_FATAL_ERROR:
EmitLastError();
break;
default:
printf("Unrecogized query status: %s\n", PQresStatus(status));
break;
}
}
void HandleTuplesResult(PGresult* result)
{
int rowCount = PQntuples(result);
for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) {
//create result object for this row
Local<Object> row = Object::New();
int fieldCount = PQnfields(result);
for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) {
char* fieldName = PQfname(result, fieldNumber);
row->Set(String::New(fieldName), WrapFieldValue(result, rowNumber, fieldNumber));
}
//not sure about what to dealloc or scope#Close here
Handle<Value> e = (Handle<Value>)row;
Emit(row_symbol, 1, &e);
}
}
Handle<Value> WrapFieldValue(PGresult* result, int rowNumber, int fieldNumber)
{
int fieldType = PQftype(result, fieldNumber);
@ -296,7 +319,6 @@ private:
StopWrite();
TRACE("Polled: PGRES_POLLING_FAILED");
EmitLastError();
EmitError("Something happened...polling error");
break;
case PGRES_POLLING_OK:
TRACE("Polled: PGRES_POLLING_OK");

View File

@ -1,35 +0,0 @@
var helper = require(__dirname + "/../test-helper");
var Connection = require(__dirname + "/../../lib/libpq").Connection;
test('calling connect without params raises error', function() {
var con = new Connection();
var err;
try{
con.connect();
} catch (e) {
err = e;
}
assert.ok(err!=null);
});
test('connecting with wrong parameters', function() {
var con = new Connection();
con.connect("user=asldfkj hostaddr=127.0.0.1 port=5432 dbname=asldkfj");
assert.emits(con, 'error', function(error) {
assert.ok(error != null, "error should not be null");
con.end();
});
});
test('connects', function() {
var con = new Connection();
con.connect("user=brian hostaddr=127.0.0.1 port=5432 dbname=postgres");
assert.emits(con, 'connect', function() {
test('disconnects', function() {
assert.emits(con, 'connect', function() {
con.end();
})
})
})
})

View File

@ -0,0 +1,21 @@
var helper = require(__dirname + "/../test-helper");
var Client = require(__dirname + "/../../lib/native").Client;
test('connecting with wrong parameters', function() {
var con = new Client("user=asldfkj hostaddr=127.0.0.1 port=5432 dbname=asldkfj");
con.connect();
assert.emits(con, 'error', function(error) {
assert.ok(error != null, "error should not be null");
con.end();
});
});
test('connects', function() {
var con = new Client("tcp://postgres:1234@127.0.0.1:5432/postgres");
con.connect();
assert.emits(con, 'connect', function() {
test('disconnects', function() {
con.end();
})
})
})

View File

@ -0,0 +1,30 @@
var helper = require(__dirname + "/../test-helper");
var Client = require(__dirname + "/../../lib/native").Client;
test('connects', function() {
var client = new Client("tcp://postgres:1234@127.0.0.1:5432/postgres");
client.connect();
test('good query', function() {
var query = client.query("SELECT 1 as num, 'HELLO' as str");
assert.emits(query, 'row', function(row) {
test('has integer data type', function() {
assert.strictEqual(row.num, 1);
})
test('has string data type', function() {
assert.strictEqual(row.str, "HELLO")
})
test('emits end AFTER row event', function() {
assert.emits(query, 'end');
test('error query', function() {
var query = client.query("LSKDJF");
assert.emits(query, 'error', function(err) {
assert.ok(err != null, "Should not have emitted null error");
client.end();
})
})
})
})
})
})

View File

@ -28,4 +28,5 @@ def build(bld):
obj.uselib = "PG"
def test(test):
Utils.exec_command("node test/libpq/connection-tests.js")
Utils.exec_command("node test/native/connection-tests.js")
Utils.exec_command("node test/native/evented-api-tests.js")