cleaned up source

This commit is contained in:
brianc 2011-02-20 16:53:21 -06:00
parent 55041fffc7
commit 2e2aa0083b

View File

@ -5,6 +5,8 @@
#include <stdlib.h>
#define LOG(msg) printf("%s\n",msg)
#define TRACE(msg) printf("%s\n", msg);
#define THROW(msg) return ThrowException(Exception::Error(String::New(msg)));
using namespace v8;
@ -16,6 +18,7 @@ class Connection : public EventEmitter {
public:
//creates the V8 objects & attaches them to the module (target)
static void
Init (Handle<Object> target)
{
@ -35,16 +38,16 @@ public:
LOG("created class");
}
//static function called by libev as callback entrypoint
static void
io_event(EV_P_ ev_io *w, int revents)
{
LOG("Received IO event");
TRACE("Received IO event");
Connection *connection = static_cast<Connection*>(w->data);
connection->HandleIOEvent(revents);
//ev_io_stop(EV_A w);
}
//v8 entry point into Connection#connect
static Handle<Value>
Connect(const Arguments& args)
{
@ -61,6 +64,7 @@ public:
return Undefined();
}
//v8 entry point into Connection#_sendQuery
static Handle<Value>
SendQuery(const Arguments& args)
{
@ -72,24 +76,11 @@ public:
if(result == 0) {
THROW("PQsendQuery returned error code");
}
//TODO should we flush before throw?
self->Flush();
return Undefined();
}
int Send(const char *queryText)
{
return PQsendQuery(connection_, queryText);
}
void Flush()
{
if(PQflush(connection_) == 1) {
ev_io_start(EV_DEFAULT_ &write_watcher_);
}
}
ev_io read_watcher_;
ev_io write_watcher_;
PGconn *connection_;
@ -110,30 +101,35 @@ public:
{
}
void StopWrite()
protected:
//v8 entry point to constructor
static Handle<Value>
New (const Arguments& args)
{
LOG("Stoping write watcher");
ev_io_stop(EV_DEFAULT_ &write_watcher_);
HandleScope scope;
Connection *connection = new Connection();
connection->Wrap(args.This());
return args.This();
}
void StartWrite()
int Send(const char *queryText)
{
LOG("Starting write watcher");
ev_io_start(EV_DEFAULT_ &write_watcher_);
return PQsendQuery(connection_, queryText);
}
void StopRead()
//flushes socket
void Flush()
{
LOG("Stoping read watcher");
ev_io_stop(EV_DEFAULT_ &read_watcher_);
}
void StartRead()
{
LOG("Starting read watcher");
ev_io_start(EV_DEFAULT_ &read_watcher_);
if(PQflush(connection_) == 1) {
TRACE("Flushing");
ev_io_start(EV_DEFAULT_ &write_watcher_);
}
}
//initializes initial async connection to postgres via libpq
//and hands off control to libev
bool Connect(const char* conninfo)
{
connection_ = PQconnectStart(conninfo);
@ -195,8 +191,9 @@ public:
int fd = PQsocket(connection_);
if(fd < 0) {
LOG("socket fd was negative. error");
return false;
}
printf("socket fd %d\n", fd);
assert(PQisnonblocking(connection_));
LOG("Setting watchers to socket");
@ -210,45 +207,7 @@ public:
return true;
}
protected:
static Handle<Value>
New (const Arguments& args)
{
HandleScope scope;
Connection *connection = new Connection();
connection->Wrap(args.This());
return args.This();
}
void HandleConnectionIO()
{
PostgresPollingStatusType status = PQconnectPoll(connection_);
switch(status) {
case PGRES_POLLING_READING:
LOG("Polled: PGRES_POLLING_READING");
StopWrite();
StartRead();
break;
case PGRES_POLLING_WRITING:
LOG("Polled: PGRES_POLLING_WRITING");
StopRead();
StartWrite();
break;
case PGRES_POLLING_FAILED:
LOG("Polled: PGRES_POLLING_FAILED");
break;
case PGRES_POLLING_OK:
LOG("Polled: PGRES_POLLING_OK");
connecting_ = false;
Emit(connect_symbol, 0, NULL);
StartRead();
default:
printf("Polled: %d\n", PQconnectPoll(connection_));
break;
}
}
//called to process io_events from libev
void HandleIOEvent(int revents)
{
if(revents & EV_ERROR) {
@ -287,11 +246,10 @@ protected:
PQfreemem(notify);
}
}
if(revents & EV_WRITE) {
LOG("revents & EV_WRITE");
TRACE("revents & EV_WRITE");
if (PQflush(connection_) == 0) {
StopWrite();
}
@ -299,8 +257,58 @@ protected:
}
private:
void AfterPollingWriting()
void HandleConnectionIO()
{
PostgresPollingStatusType status = PQconnectPoll(connection_);
switch(status) {
case PGRES_POLLING_READING:
LOG("Polled: PGRES_POLLING_READING");
StopWrite();
StartRead();
break;
case PGRES_POLLING_WRITING:
LOG("Polled: PGRES_POLLING_WRITING");
StopRead();
StartWrite();
break;
case PGRES_POLLING_FAILED:
StopRead();
StopWrite();
LOG("Polled: PGRES_POLLING_FAILED");
break;
case PGRES_POLLING_OK:
LOG("Polled: PGRES_POLLING_OK");
connecting_ = false;
Emit(connect_symbol, 0, NULL);
StartRead();
default:
printf("Unknown polling status: %d\n", status);
break;
}
}
void StopWrite()
{
TRACE("Stoping write watcher");
ev_io_stop(EV_DEFAULT_ &write_watcher_);
}
void StartWrite()
{
TRACE("Starting write watcher");
ev_io_start(EV_DEFAULT_ &write_watcher_);
}
void StopRead()
{
TRACE("Stoping read watcher");
ev_io_stop(EV_DEFAULT_ &read_watcher_);
}
void StartRead()
{
TRACE("Starting read watcher");
ev_io_start(EV_DEFAULT_ &read_watcher_);
}
};