#include #include #include #include #include #define LOG(msg) printf("%s\n",msg); #define TRACE(msg) //printf("%s\n", msg); #define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); using namespace v8; using namespace node; static Persistent severity_symbol; static Persistent code_symbol; static Persistent detail_symbol; static Persistent hint_symbol; static Persistent position_symbol; static Persistent internalPosition_symbol; static Persistent internalQuery_symbol; static Persistent where_symbol; static Persistent file_symbol; static Persistent line_symbol; static Persistent routine_symbol; static Persistent name_symbol; static Persistent value_symbol; static Persistent type_symbol; static Persistent channel_symbol; static Persistent payload_symbol; static Persistent emit_symbol; static Persistent command_symbol; class Connection : public ObjectWrap { public: //creates the V8 objects & attaches them to the module (target) static void Init (Handle target) { HandleScope scope; Local t = FunctionTemplate::New(New); t->InstanceTemplate()->SetInternalFieldCount(1); t->SetClassName(String::NewSymbol("Connection")); emit_symbol = NODE_PSYMBOL("emit"); severity_symbol = NODE_PSYMBOL("severity"); code_symbol = NODE_PSYMBOL("code"); detail_symbol = NODE_PSYMBOL("detail"); hint_symbol = NODE_PSYMBOL("hint"); position_symbol = NODE_PSYMBOL("position"); internalPosition_symbol = NODE_PSYMBOL("internalPosition"); internalQuery_symbol = NODE_PSYMBOL("internalQuery"); where_symbol = NODE_PSYMBOL("where"); file_symbol = NODE_PSYMBOL("file"); line_symbol = NODE_PSYMBOL("line"); routine_symbol = NODE_PSYMBOL("routine"); name_symbol = NODE_PSYMBOL("name"); value_symbol = NODE_PSYMBOL("value"); type_symbol = NODE_PSYMBOL("type"); channel_symbol = NODE_PSYMBOL("channel"); payload_symbol = NODE_PSYMBOL("payload"); command_symbol = NODE_PSYMBOL("command"); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); NODE_SET_PROTOTYPE_METHOD(t, "_sendQuery", SendQuery); NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams); NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare); NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared); NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel); NODE_SET_PROTOTYPE_METHOD(t, "end", End); target->Set(String::NewSymbol("Connection"), t->GetFunction()); TRACE("created class"); } //static function called by libev as callback entrypoint static void io_event(uv_poll_t* w, int status, int revents) { TRACE("Received IO event"); Connection *connection = static_cast(w->data); connection->HandleIOEvent(revents); } //v8 entry point into Connection#connect static Handle Connect(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); if(args.Length() == 0 || !args[0]->IsString()) { THROW("Must include connection string as only argument to connect"); } String::Utf8Value conninfo(args[0]->ToString()); bool success = self->Connect(*conninfo); if(!success) { self -> EmitLastError(); self -> DestroyConnection(); } return Undefined(); } //v8 entry point into Connection#cancel static Handle Cancel(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); bool success = self->Cancel(); if(!success) { self -> EmitLastError(); self -> DestroyConnection(); } return Undefined(); } //v8 entry point into Connection#_sendQuery static Handle SendQuery(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); if(!args[0]->IsString()) { THROW("First parameter must be a string query"); } char* queryText = MallocCString(args[0]); int result = self->Send(queryText); free(queryText); if(result == 0) { THROW("PQsendQuery returned error code"); } //TODO should we flush before throw? self->Flush(); return Undefined(); } //v8 entry point into Connection#_sendQueryWithParams static Handle SendQueryWithParams(const Arguments& args) { HandleScope scope; //dispatch non-prepared parameterized query return DispatchParameterizedQuery(args, false); } //v8 entry point into Connection#_sendPrepare(string queryName, string queryText, int nParams) static Handle SendPrepare(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); String::Utf8Value queryName(args[0]); String::Utf8Value queryText(args[1]); int length = args[2]->Int32Value(); self->SendPrepare(*queryName, *queryText, length); return Undefined(); } //v8 entry point into Connection#_sendQueryPrepared(string queryName, string[] paramValues) static Handle SendQueryPrepared(const Arguments& args) { HandleScope scope; //dispatch prepared parameterized query return DispatchParameterizedQuery(args, true); } static Handle DispatchParameterizedQuery(const Arguments& args, bool isPrepared) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); String::Utf8Value queryName(args[0]); //TODO this is much copy/pasta code if(!args[0]->IsString()) { THROW("First parameter must be a string"); } if(!args[1]->IsArray()) { THROW("Values must be an array"); } Local jsParams = Local::Cast(args[1]); int len = jsParams->Length(); char** paramValues = ArgToCStringArray(jsParams); if(!paramValues) { THROW("Unable to allocate char **paramValues from Local of v8 params"); } char* queryText = MallocCString(args[0]); int result = 0; if(isPrepared) { result = self->SendPreparedQuery(queryText, len, paramValues); } else { result = self->SendQueryParams(queryText, len, paramValues); } free(queryText); ReleaseCStringArray(paramValues, len); if(result == 1) { return Undefined(); } self->EmitLastError(); THROW("Postgres returned non-1 result from query dispatch."); } //v8 entry point into Connection#end static Handle End(const Arguments& args) { HandleScope scope; Connection *self = ObjectWrap::Unwrap(args.This()); self->End(); return Undefined(); } uv_poll_t read_watcher_; uv_poll_t write_watcher_; PGconn *connection_; bool connecting_; Connection () : ObjectWrap () { connection_ = NULL; connecting_ = false; TRACE("Initializing ev watchers"); read_watcher_.data = this; write_watcher_.data = this; } ~Connection () { } protected: //v8 entry point to constructor static Handle New (const Arguments& args) { HandleScope scope; Connection *connection = new Connection(); connection->Wrap(args.This()); return args.This(); } int Send(const char *queryText) { int rv = PQsendQuery(connection_, queryText); StartWrite(); return rv; } int SendQueryParams(const char *command, const int nParams, const char * const *paramValues) { int rv = PQsendQueryParams(connection_, command, nParams, NULL, paramValues, NULL, NULL, 0); StartWrite(); return rv; } int SendPrepare(const char *name, const char *command, const int nParams) { int rv = PQsendPrepare(connection_, name, command, nParams, NULL); StartWrite(); return rv; } int SendPreparedQuery(const char *name, int nParams, const char * const *paramValues) { int rv = PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0); StartWrite(); return rv; } int Cancel() { PGcancel* pgCancel = PQgetCancel(connection_); char errbuf[256]; int result = PQcancel(pgCancel, errbuf, 256); StartWrite(); PQfreeCancel(pgCancel); return result; } //flushes socket void Flush() { if(PQflush(connection_) == 1) { TRACE("Flushing"); uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); } } //safely destroys the connection at most 1 time void DestroyConnection() { if(connection_ != NULL) { PQfinish(connection_); connection_ = NULL; } } //initializes initial async connection to postgres via libpq //and hands off control to libev bool Connect(const char* conninfo) { connection_ = PQconnectStart(conninfo); if (!connection_) { LOG("Connection couldn't be created"); } if (PQsetnonblocking(connection_, 1) == -1) { LOG("Unable to set connection to non-blocking"); return false; } ConnStatusType status = PQstatus(connection_); if(CONNECTION_BAD == status) { LOG("Bad connection status"); return false; } int fd = PQsocket(connection_); if(fd < 0) { LOG("socket fd was negative. error"); return false; } assert(PQisnonblocking(connection_)); PQsetNoticeProcessor(connection_, NoticeReceiver, this); TRACE("Setting watchers to socket"); uv_poll_init(uv_default_loop(), &read_watcher_, fd); uv_poll_init(uv_default_loop(), &write_watcher_, fd); connecting_ = true; StartWrite(); Ref(); return true; } static void NoticeReceiver(void *arg, const char *message) { Connection *self = (Connection*)arg; self->HandleNotice(message); } void HandleNotice(const char *message) { HandleScope scope; Handle notice = String::New(message); Emit("notice", ¬ice); } //called to process io_events from libev void HandleIOEvent(int revents) { if(revents & EV_ERROR) { LOG("Connection error."); return; } if(connecting_) { TRACE("Processing connecting_ io"); HandleConnectionIO(); return; } if(revents & EV_READ) { TRACE("revents & EV_READ"); if(PQconsumeInput(connection_) == 0) { End(); EmitLastError(); LOG("Something happened, consume input is 0"); return; } //declare handlescope as this method is entered via a libev callback //and not part of the public v8 interface HandleScope scope; if (PQisBusy(connection_) == 0) { PGresult *result; bool didHandleResult = false; while ((result = PQgetResult(connection_))) { HandleResult(result); didHandleResult = true; PQclear(result); } //might have fired from notification if(didHandleResult) { Emit("_readyForQuery"); } } PGnotify *notify; while ((notify = PQnotifies(connection_))) { Local result = Object::New(); result->Set(channel_symbol, String::New(notify->relname)); result->Set(payload_symbol, String::New(notify->extra)); Handle res = (Handle)result; Emit("notification", &res); PQfreemem(notify); } } if(revents & EV_WRITE) { TRACE("revents & EV_WRITE"); if (PQflush(connection_) == 0) { StopWrite(); } } } void HandleResult(PGresult* result) { ExecStatusType status = PQresultStatus(result); switch(status) { case PGRES_TUPLES_OK: { HandleTuplesResult(result); EmitCommandMetaData(result); } break; case PGRES_FATAL_ERROR: HandleErrorResult(result); break; case PGRES_COMMAND_OK: case PGRES_EMPTY_QUERY: EmitCommandMetaData(result); break; default: printf("Unrecogized query status: %s\n", PQresStatus(status)); break; } } void EmitCommandMetaData(PGresult* result) { HandleScope scope; Local info = Object::New(); info->Set(command_symbol, String::New(PQcmdStatus(result))); info->Set(value_symbol, String::New(PQcmdTuples(result))); Handle e = (Handle)info; Emit("_cmdStatus", &e); } //maps the postgres tuple results to v8 objects //and emits row events //TODO look at emitting fewer events because the back & forth between //javascript & c++ might introduce overhead (requires benchmarking) void HandleTuplesResult(const PGresult* result) { HandleScope scope; int rowCount = PQntuples(result); for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) { //create result object for this row Local row = Array::New(); int fieldCount = PQnfields(result); for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) { Local field = Object::New(); //name of field char* fieldName = PQfname(result, fieldNumber); field->Set(name_symbol, String::New(fieldName)); //oid of type of field int fieldType = PQftype(result, fieldNumber); field->Set(type_symbol, Integer::New(fieldType)); //value of field if(PQgetisnull(result, rowNumber, fieldNumber)) { field->Set(value_symbol, Null()); } else { char* fieldValue = PQgetvalue(result, rowNumber, fieldNumber); field->Set(value_symbol, String::New(fieldValue)); } row->Set(Integer::New(fieldNumber), field); } Handle e = (Handle)row; Emit("_row", &e); } } void HandleErrorResult(const PGresult* result) { HandleScope scope; //instantiate the return object as an Error with the summary Postgres message Local msg = Local::Cast(Exception::Error(String::New(PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)))); //add the other information returned by Postgres to the error object AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY); AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE); AttachErrorField(result, msg, detail_symbol, PG_DIAG_MESSAGE_DETAIL); AttachErrorField(result, msg, hint_symbol, PG_DIAG_MESSAGE_HINT); AttachErrorField(result, msg, position_symbol, PG_DIAG_STATEMENT_POSITION); AttachErrorField(result, msg, internalPosition_symbol, PG_DIAG_INTERNAL_POSITION); AttachErrorField(result, msg, internalQuery_symbol, PG_DIAG_INTERNAL_QUERY); AttachErrorField(result, msg, where_symbol, PG_DIAG_CONTEXT); AttachErrorField(result, msg, file_symbol, PG_DIAG_SOURCE_FILE); AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE); AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION); Handle m = msg; Emit("_error", &m); } void AttachErrorField(const PGresult *result, const Local msg, const Persistent symbol, int fieldcode) { char *val = PQresultErrorField(result, fieldcode); if(val) { msg->Set(symbol, String::New(val)); } } void End() { StopRead(); StopWrite(); DestroyConnection(); } private: //EventEmitter was removed from c++ in node v0.5.x void Emit(const char* message) { HandleScope scope; Handle args[1] = { String::New(message) }; Emit(1, args); } void Emit(const char* message, Handle* arg) { HandleScope scope; Handle args[2] = { String::New(message), *arg }; Emit(2, args); } void Emit(int length, Handle *args) { HandleScope scope; Local emit_v = this->handle_->Get(emit_symbol); assert(emit_v->IsFunction()); Local emit_f = emit_v.As(); TryCatch tc; emit_f->Call(this->handle_, length, args); if(tc.HasCaught()) { FatalException(tc); } } void HandleConnectionIO() { PostgresPollingStatusType status = PQconnectPoll(connection_); switch(status) { case PGRES_POLLING_READING: TRACE("Polled: PGRES_POLLING_READING"); StopWrite(); StartRead(); break; case PGRES_POLLING_WRITING: TRACE("Polled: PGRES_POLLING_WRITING"); StopRead(); StartWrite(); break; case PGRES_POLLING_FAILED: StopRead(); StopWrite(); TRACE("Polled: PGRES_POLLING_FAILED"); EmitLastError(); break; case PGRES_POLLING_OK: TRACE("Polled: PGRES_POLLING_OK"); connecting_ = false; StartRead(); Emit("connect"); default: //printf("Unknown polling status: %d\n", status); break; } } void EmitError(const char *message) { Local exception = Exception::Error(String::New(message)); Emit("_error", &exception); } void EmitLastError() { EmitError(PQerrorMessage(connection_)); } void StopWrite() { TRACE("Stoping write watcher"); uv_poll_stop(&write_watcher_); } void StartWrite() { TRACE("Starting write watcher"); uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); } void StopRead() { TRACE("Stoping read watcher"); uv_poll_stop(&read_watcher_); } void StartRead() { TRACE("Starting read watcher"); uv_poll_start(&read_watcher_, UV_READABLE, io_event); } //Converts a v8 array to an array of cstrings //the result char** array must be free() when it is no longer needed //if for any reason the array cannot be created, returns 0 static char** ArgToCStringArray(Local params) { int len = params->Length(); char** paramValues = new char*[len]; for(int i = 0; i < len; i++) { Handle val = params->Get(i); if(val->IsString()) { char* cString = MallocCString(val); //will be 0 if could not malloc if(!cString) { LOG("ArgToCStringArray: OUT OF MEMORY OR SOMETHING BAD!"); ReleaseCStringArray(paramValues, i-1); return 0; } paramValues[i] = cString; } else if(val->IsNull()) { paramValues[i] = NULL; } else { //a paramter was not a string LOG("Parameter not a string"); ReleaseCStringArray(paramValues, i-1); return 0; } } return paramValues; } //helper function to release cString arrays static void ReleaseCStringArray(char **strArray, int len) { for(int i = 0; i < len; i++) { free(strArray[i]); } delete [] strArray; } //helper function to malloc new string from v8string static char* MallocCString(v8::Handle v8String) { String::Utf8Value utf8String(v8String->ToString()); char *cString = (char *) malloc(strlen(*utf8String) + 1); if(!cString) { return cString; } strcpy(cString, *utf8String); return cString; } }; extern "C" void init (Handle target) { HandleScope scope; Connection::Init(target); }