diff --git a/src/binding.cc b/src/binding.cc index 5d17d8d1..a2d2d27d 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -249,6 +249,8 @@ public: bool ioInitialized_; bool copyOutMode_; bool copyInMode_; + bool reading_; + bool writing_; Connection () : ObjectWrap () { connection_ = NULL; @@ -256,6 +258,8 @@ public: ioInitialized_ = false; copyOutMode_ = false; copyInMode_ = false; + reading_ = false; + writing_ = false; TRACE("Initializing ev watchers"); read_watcher_.data = this; write_watcher_.data = this; @@ -304,6 +308,7 @@ protected: int Send(const char *queryText) { + TRACE("js::Send") int rv = PQsendQuery(connection_, queryText); StartWrite(); return rv; @@ -311,6 +316,7 @@ protected: int SendQueryParams(const char *command, const int nParams, const char * const *paramValues) { + TRACE("js::SendQueryParams") int rv = PQsendQueryParams(connection_, command, nParams, NULL, paramValues, NULL, NULL, 0); StartWrite(); return rv; @@ -318,6 +324,7 @@ protected: int SendPrepare(const char *name, const char *command, const int nParams) { + TRACE("js::SendPrepare") int rv = PQsendPrepare(connection_, name, command, nParams, NULL); StartWrite(); return rv; @@ -430,7 +437,7 @@ protected: if(PQconsumeInput(connection_) == 0) { End(); EmitLastError(); - LOG("Something happened, consume input is 0"); + //LOG("Something happened, consume input is 0"); return; } @@ -476,7 +483,8 @@ protected: if(revents & UV_WRITABLE) { TRACE("revents & UV_WRITABLE"); if (PQflush(connection_) == 0) { - StopWrite(); + //nothing left to write, poll the socket for more to read + StartRead(); } } } @@ -669,12 +677,10 @@ private: switch(status) { case PGRES_POLLING_READING: TRACE("Polled: PGRES_POLLING_READING"); - StopWrite(); StartRead(); break; case PGRES_POLLING_WRITING: TRACE("Polled: PGRES_POLLING_WRITING"); - StopRead(); StartWrite(); break; case PGRES_POLLING_FAILED: @@ -712,30 +718,42 @@ private: void StopWrite() { - TRACE("Stoping write watcher"); + TRACE("write STOP"); if(ioInitialized_) { uv_poll_stop(&write_watcher_); + writing_ = false; } } void StartWrite() { - TRACE("Starting write watcher"); + TRACE("write START"); + if(reading_) { + TRACE("stop READ to start WRITE"); + StopRead(); + } uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); + writing_ = true; } void StopRead() { - TRACE("Stoping read watcher"); + TRACE("read STOP"); if(ioInitialized_) { uv_poll_stop(&read_watcher_); + reading_ = false; } } void StartRead() { - TRACE("Starting read watcher"); + TRACE("read START"); + if(writing_) { + TRACE("stop WRITE to start READ"); + StopWrite(); + } uv_poll_start(&read_watcher_, UV_READABLE, io_event); + reading_ = true; } //Converts a v8 array to an array of cstrings //the result char** array must be free() when it is no longer needed