diff --git a/src/binding.cc b/src/binding.cc index 43d89a69..32b88147 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -5,6 +5,8 @@ #include #define LOG(msg) printf("%s\n",msg) +#define TRACE(msg) printf("%s\n", msg); + #define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); using namespace v8; @@ -16,6 +18,7 @@ class Connection : public EventEmitter { public: + //creates the V8 objects & attaches them to the module (target) static void Init (Handle target) { @@ -35,16 +38,16 @@ public: LOG("created class"); } + //static function called by libev as callback entrypoint static void io_event(EV_P_ ev_io *w, int revents) { - LOG("Received IO event"); + TRACE("Received IO event"); Connection *connection = static_cast(w->data); connection->HandleIOEvent(revents); - //ev_io_stop(EV_A w); } - + //v8 entry point into Connection#connect static Handle Connect(const Arguments& args) { @@ -61,6 +64,7 @@ public: return Undefined(); } + //v8 entry point into Connection#_sendQuery static Handle SendQuery(const Arguments& args) { @@ -72,24 +76,11 @@ public: if(result == 0) { THROW("PQsendQuery returned error code"); } - + //TODO should we flush before throw? self->Flush(); return Undefined(); } - int Send(const char *queryText) - { - return PQsendQuery(connection_, queryText); - } - - void Flush() - { - if(PQflush(connection_) == 1) { - ev_io_start(EV_DEFAULT_ &write_watcher_); - } - } - - ev_io read_watcher_; ev_io write_watcher_; PGconn *connection_; @@ -110,30 +101,35 @@ public: { } - void StopWrite() +protected: + + //v8 entry point to constructor + static Handle + New (const Arguments& args) { - LOG("Stoping write watcher"); - ev_io_stop(EV_DEFAULT_ &write_watcher_); + HandleScope scope; + Connection *connection = new Connection(); + connection->Wrap(args.This()); + + return args.This(); } - void StartWrite() + int Send(const char *queryText) { - LOG("Starting write watcher"); - ev_io_start(EV_DEFAULT_ &write_watcher_); + return PQsendQuery(connection_, queryText); } - void StopRead() + //flushes socket + void Flush() { - LOG("Stoping read watcher"); - ev_io_stop(EV_DEFAULT_ &read_watcher_); - } - - void StartRead() - { - LOG("Starting read watcher"); - ev_io_start(EV_DEFAULT_ &read_watcher_); + if(PQflush(connection_) == 1) { + TRACE("Flushing"); + ev_io_start(EV_DEFAULT_ &write_watcher_); + } } + //initializes initial async connection to postgres via libpq + //and hands off control to libev bool Connect(const char* conninfo) { connection_ = PQconnectStart(conninfo); @@ -195,8 +191,9 @@ public: int fd = PQsocket(connection_); if(fd < 0) { LOG("socket fd was negative. error"); + return false; } - printf("socket fd %d\n", fd); + assert(PQisnonblocking(connection_)); LOG("Setting watchers to socket"); @@ -210,45 +207,7 @@ public: return true; } -protected: - static Handle - New (const Arguments& args) - { - HandleScope scope; - Connection *connection = new Connection(); - connection->Wrap(args.This()); - - return args.This(); - } - - void HandleConnectionIO() - { - PostgresPollingStatusType status = PQconnectPoll(connection_); - switch(status) { - case PGRES_POLLING_READING: - LOG("Polled: PGRES_POLLING_READING"); - StopWrite(); - StartRead(); - break; - case PGRES_POLLING_WRITING: - LOG("Polled: PGRES_POLLING_WRITING"); - StopRead(); - StartWrite(); - break; - case PGRES_POLLING_FAILED: - LOG("Polled: PGRES_POLLING_FAILED"); - break; - case PGRES_POLLING_OK: - LOG("Polled: PGRES_POLLING_OK"); - connecting_ = false; - Emit(connect_symbol, 0, NULL); - StartRead(); - default: - printf("Polled: %d\n", PQconnectPoll(connection_)); - break; - } - } - + //called to process io_events from libev void HandleIOEvent(int revents) { if(revents & EV_ERROR) { @@ -287,11 +246,10 @@ protected: PQfreemem(notify); } - } if(revents & EV_WRITE) { - LOG("revents & EV_WRITE"); + TRACE("revents & EV_WRITE"); if (PQflush(connection_) == 0) { StopWrite(); } @@ -299,8 +257,58 @@ protected: } private: - void AfterPollingWriting() + void HandleConnectionIO() { + PostgresPollingStatusType status = PQconnectPoll(connection_); + switch(status) { + case PGRES_POLLING_READING: + LOG("Polled: PGRES_POLLING_READING"); + StopWrite(); + StartRead(); + break; + case PGRES_POLLING_WRITING: + LOG("Polled: PGRES_POLLING_WRITING"); + StopRead(); + StartWrite(); + break; + case PGRES_POLLING_FAILED: + StopRead(); + StopWrite(); + LOG("Polled: PGRES_POLLING_FAILED"); + break; + case PGRES_POLLING_OK: + LOG("Polled: PGRES_POLLING_OK"); + connecting_ = false; + Emit(connect_symbol, 0, NULL); + StartRead(); + default: + printf("Unknown polling status: %d\n", status); + break; + } + } + + void StopWrite() + { + TRACE("Stoping write watcher"); + ev_io_stop(EV_DEFAULT_ &write_watcher_); + } + + void StartWrite() + { + TRACE("Starting write watcher"); + ev_io_start(EV_DEFAULT_ &write_watcher_); + } + + void StopRead() + { + TRACE("Stoping read watcher"); + ev_io_stop(EV_DEFAULT_ &read_watcher_); + } + + void StartRead() + { + TRACE("Starting read watcher"); + ev_io_start(EV_DEFAULT_ &read_watcher_); } };