From c6d3cb0dfec6b39f9097288734dd57120950bda0 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 28 Jul 2015 15:18:57 -0700 Subject: [PATCH 1/4] Wrap connectivity API, expose it to client as waitForReady --- ext/channel.cc | 80 +++++++++++++++++++++++++ ext/channel.h | 2 + ext/completion_queue_async_worker.cc | 2 +- ext/node_grpc.cc | 19 ++++++ src/client.js | 41 +++++++++++++ test/channel_test.js | 88 +++++++++++++++++++++++++++- test/constant_test.js | 19 ++++++ test/surface_test.js | 76 ++++++++++++++++++++++++ 8 files changed, 323 insertions(+), 4 deletions(-) diff --git a/ext/channel.cc b/ext/channel.cc index c43b55f1..fa99c10d 100644 --- a/ext/channel.cc +++ b/ext/channel.cc @@ -33,12 +33,17 @@ #include +#include "grpc/support/log.h" + #include #include #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "call.h" #include "channel.h" +#include "completion_queue_async_worker.h" #include "credentials.h" +#include "timeval.h" namespace grpc { namespace node { @@ -51,11 +56,31 @@ using v8::Handle; using v8::HandleScope; using v8::Integer; using v8::Local; +using v8::Number; using v8::Object; using v8::Persistent; using v8::String; using v8::Value; +class ConnectivityStateOp : public Op { + public: + Handle GetNodeValue() const { + return NanNew(new_state); + } + + bool ParseOp(Handle value, grpc_op *out, + shared_ptr resources) { + return true; + } + + grpc_connectivity_state new_state; + + protected: + std::string GetTypeString() const { + return "new_state"; + } +}; + NanCallback *Channel::constructor; Persistent Channel::fun_tpl; @@ -78,6 +103,12 @@ void Channel::Init(Handle exports) { NanNew(Close)->GetFunction()); NanSetPrototypeTemplate(tpl, "getTarget", NanNew(GetTarget)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "getConnectivityState", + NanNew(GetConnectivityState)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "watchConnectivityState", + NanNew(WatchConnectivityState)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle ctr = tpl->GetFunction(); constructor = new NanCallback(ctr); @@ -196,5 +227,54 @@ NAN_METHOD(Channel::GetTarget) { NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel))); } +NAN_METHOD(Channel::GetConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "getConnectivityState can only be called on Channel objects"); + } + Channel *channel = ObjectWrap::Unwrap(args.This()); + int try_to_connect = (int)args[0]->Equals(NanTrue()); + NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel, + try_to_connect)); +} + +NAN_METHOD(Channel::WatchConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "watchConnectivityState can only be called on Channel objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "watchConnectivityState's first argument must be a channel state"); + } + if (!(args[1]->IsNumber() || args[1]->IsDate())) { + return NanThrowTypeError( + "watchConnectivityState's second argument must be a date or a number"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "watchConnectivityState's third argument must be a callback"); + } + grpc_connectivity_state last_state = + static_cast(args[0]->Uint32Value()); + double deadline = args[1]->NumberValue(); + Handle callback_func = args[2].As(); + NanCallback *callback = new NanCallback(callback_func); + Channel *channel = ObjectWrap::Unwrap(args.This()); + ConnectivityStateOp *op = new ConnectivityStateOp(); + unique_ptr ops(new OpVec()); + ops->push_back(unique_ptr(op)); + grpc_channel_watch_connectivity_state( + channel->wrapped_channel, last_state, &op->new_state, + MillisecondsToTimespec(deadline), CompletionQueueAsyncWorker::GetQueue(), + new struct tag(callback, + ops.release(), + shared_ptr(nullptr))); + CompletionQueueAsyncWorker::Next(); + NanReturnUndefined(); +} + } // namespace node } // namespace grpc diff --git a/ext/channel.h b/ext/channel.h index 6725ebb0..c4e83a32 100644 --- a/ext/channel.h +++ b/ext/channel.h @@ -67,6 +67,8 @@ class Channel : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(Close); static NAN_METHOD(GetTarget); + static NAN_METHOD(GetConnectivityState); + static NAN_METHOD(WatchConnectivityState); static NanCallback *constructor; static v8::Persistent fun_tpl; diff --git a/ext/completion_queue_async_worker.cc b/ext/completion_queue_async_worker.cc index 1215c97e..c45e303f 100644 --- a/ext/completion_queue_async_worker.cc +++ b/ext/completion_queue_async_worker.cc @@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME)); if (!result.success) { - SetErrorMessage("The batch encountered an error"); + SetErrorMessage("The asnyc function encountered an error"); } } diff --git a/ext/node_grpc.cc b/ext/node_grpc.cc index 4e31cbaa..331ccb60 100644 --- a/ext/node_grpc.cc +++ b/ext/node_grpc.cc @@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle exports) { op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } +void InitConnectivityStateConstants(Handle exports) { + NanScope(); + Handle channel_state = NanNew(); + exports->Set(NanNew("connectivityState"), channel_state); + Handle IDLE(NanNew(GRPC_CHANNEL_IDLE)); + channel_state->Set(NanNew("IDLE"), IDLE); + Handle CONNECTING(NanNew(GRPC_CHANNEL_CONNECTING)); + channel_state->Set(NanNew("CONNECTING"), CONNECTING); + Handle READY(NanNew(GRPC_CHANNEL_READY)); + channel_state->Set(NanNew("READY"), READY); + Handle TRANSIENT_FAILURE( + NanNew(GRPC_CHANNEL_TRANSIENT_FAILURE)); + channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE); + Handle FATAL_FAILURE( + NanNew(GRPC_CHANNEL_FATAL_FAILURE)); + channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); +} + void init(Handle exports) { NanScope(); grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); InitOpTypeConstants(exports); + InitConnectivityStateConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/client.js b/src/client.js index f843669b..39392dff 100644 --- a/src/client.js +++ b/src/client.js @@ -551,6 +551,47 @@ exports.makeClientConstructor = function(methods, serviceName) { this.updateMetadata = updateMetadata; } + /** + * Wait for the client to be ready. The callback will be called when the + * client has successfully connected to the server, and it will be called + * with an error if the attempt to connect to the server has unrecoverablly + * failed or if the deadline expires. This function does not automatically + * attempt to initiate the connection, so the callback will not be called + * unless you also start a method call or call $tryConnect. + * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass + * Infinity to wait forever. + * @param {function(Error)} callback The callback to call when done attempting + * to connect. + */ + Client.prototype.$waitForReady = function(deadline, callback) { + var self = this; + var checkState = function(err, result) { + if (err) { + callback(new Error('Failed to connect before the deadline')); + } + var new_state = result.new_state; + console.log(result); + if (new_state === grpc.connectivityState.READY) { + callback(); + } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { + callback(new Error('Failed to connect to server')); + } else { + self.channel.watchConnectivityState(new_state, deadline, checkState); + } + }; + checkState(null, {new_state: this.channel.getConnectivityState()}); + }; + + /** + * Attempt to connect to the server. That will happen automatically if + * you try to make a method call with this client, so this function should + * only be used if you want to know that you have a connection before making + * any calls. + */ + Client.prototype.$tryConnect = function() { + this.channel.getConnectivityState(true); + }; + _.each(methods, function(attrs, name) { var method_type; if (attrs.requestStream) { diff --git a/test/channel_test.js b/test/channel_test.js index 3e61d3bb..feb3e672 100644 --- a/test/channel_test.js +++ b/test/channel_test.js @@ -36,6 +36,27 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + describe('channel', function() { describe('constructor', function() { it('should require a string for the first argument', function() { @@ -73,14 +94,16 @@ describe('channel', function() { }); }); describe('close', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); it('should succeed silently', function() { - var channel = new grpc.Channel('hostname', {}); assert.doesNotThrow(function() { channel.close(); }); }); it('should be idempotent', function() { - var channel = new grpc.Channel('hostname', {}); assert.doesNotThrow(function() { channel.close(); channel.close(); @@ -88,9 +111,68 @@ describe('channel', function() { }); }); describe('getTarget', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); it('should return a string', function() { - var channel = new grpc.Channel('localhost', {}); assert.strictEqual(typeof channel.getTarget(), 'string'); }); }); + describe('getConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); + it('should return IDLE for a new channel', function() { + assert.strictEqual(channel.getConnectivityState(), + grpc.connectivityState.IDLE); + }); + }); + describe('watchConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('localhost', {}); + }); + afterEach(function() { + channel.close(); + }); + it('should time out if called alone', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert(err); + done(); + }); + }); + it('should complete if a connection attempt is forced', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + it('should complete twice if called twice', function(done) { + done = multiDone(done, 2); + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + }); }); diff --git a/test/constant_test.js b/test/constant_test.js index ecc98ec4..93bf0c8a 100644 --- a/test/constant_test.js +++ b/test/constant_test.js @@ -78,6 +78,19 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; +/** + * List of all connectivity state names + * @const + * @type {Array.} + */ +var connectivityStateNames = [ + 'IDLE', + 'CONNECTING', + 'READY', + 'TRANSIENT_FAILURE', + 'FATAL_FAILURE' +]; + describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -91,4 +104,10 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); + it('should have all of the connectivity states', function() { + for (var i = 0; i < connectivityStateNames.length; i++) { + assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]), + 'connectivity status missing: ' + connectivityStateNames[i]); + } + }); }); diff --git a/test/surface_test.js b/test/surface_test.js index 98f9b15b..4711658e 100644 --- a/test/surface_test.js +++ b/test/surface_test.js @@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math'); var _ = require('lodash'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + describe('File loader', function() { it('Should load a proto file by default', function() { assert.doesNotThrow(function() { @@ -110,6 +131,61 @@ describe('Server.prototype.addProtoService', function() { }); }); }); +describe('Client#$waitForReady', function() { + var server; + var port; + var Client; + var client; + before(function() { + server = new grpc.Server(); + port = server.bind('localhost:0'); + server.start(); + Client = surface_client.makeProtobufClientConstructor(mathService); + }); + beforeEach(function() { + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('should complete when a call is initiated', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + var call = client.div({}, function(err, response) {}); + call.cancel(); + }); + it('should complete if $tryConnect is called', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$tryConnect(); + }); + it('should complete if called more than once', function(done) { + done = multiDone(done, 2); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$tryConnect(); + }); + it('should complete if called when already ready', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + client.$tryConnect(); + }); +}); describe('Echo service', function() { var server; var client; From d71afbc48bf9616d414cfcd250956e3f0810979b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 13 Aug 2015 11:00:13 -0700 Subject: [PATCH 2/4] Fixed typo --- ext/completion_queue_async_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/completion_queue_async_worker.cc b/ext/completion_queue_async_worker.cc index c45e303f..b3e8b960 100644 --- a/ext/completion_queue_async_worker.cc +++ b/ext/completion_queue_async_worker.cc @@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME)); if (!result.success) { - SetErrorMessage("The asnyc function encountered an error"); + SetErrorMessage("The async function encountered an error"); } } From 46a84765018ee8019b364a897fc3da907cf74c45 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 13 Aug 2015 11:24:34 -0700 Subject: [PATCH 3/4] Modified watchState functions to match C API --- ext/channel.cc | 25 ++----------------------- src/client.js | 22 +++++----------------- test/surface_test.js | 13 ++++++------- 3 files changed, 13 insertions(+), 47 deletions(-) diff --git a/ext/channel.cc b/ext/channel.cc index aa1ca031..907178f1 100644 --- a/ext/channel.cc +++ b/ext/channel.cc @@ -62,25 +62,6 @@ using v8::Persistent; using v8::String; using v8::Value; -class ConnectivityStateOp : public Op { - public: - Handle GetNodeValue() const { - return NanNew(new_state); - } - - bool ParseOp(Handle value, grpc_op *out, - shared_ptr resources) { - return true; - } - - grpc_connectivity_state new_state; - - protected: - std::string GetTypeString() const { - return "new_state"; - } -}; - NanCallback *Channel::constructor; Persistent Channel::fun_tpl; @@ -252,12 +233,10 @@ NAN_METHOD(Channel::WatchConnectivityState) { Handle callback_func = args[2].As(); NanCallback *callback = new NanCallback(callback_func); Channel *channel = ObjectWrap::Unwrap(args.This()); - ConnectivityStateOp *op = new ConnectivityStateOp(); unique_ptr ops(new OpVec()); - ops->push_back(unique_ptr(op)); grpc_channel_watch_connectivity_state( - channel->wrapped_channel, last_state, &op->new_state, - MillisecondsToTimespec(deadline), CompletionQueueAsyncWorker::GetQueue(), + channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), + CompletionQueueAsyncWorker::GetQueue(), new struct tag(callback, ops.release(), shared_ptr(nullptr))); diff --git a/src/client.js b/src/client.js index f47e030f..0cd29e5b 100644 --- a/src/client.js +++ b/src/client.js @@ -562,9 +562,8 @@ exports.makeClientConstructor = function(methods, serviceName) { * Wait for the client to be ready. The callback will be called when the * client has successfully connected to the server, and it will be called * with an error if the attempt to connect to the server has unrecoverablly - * failed or if the deadline expires. This function does not automatically - * attempt to initiate the connection, so the callback will not be called - * unless you also start a method call or call $tryConnect. + * failed or if the deadline expires. This function will make the channel + * start connecting if it has not already done so. * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass * Infinity to wait forever. * @param {function(Error)} callback The callback to call when done attempting @@ -572,12 +571,11 @@ exports.makeClientConstructor = function(methods, serviceName) { */ Client.prototype.$waitForReady = function(deadline, callback) { var self = this; - var checkState = function(err, result) { + var checkState = function(err) { if (err) { callback(new Error('Failed to connect before the deadline')); } - var new_state = result.new_state; - console.log(result); + var new_state = this.channel.getConnectivityState(true); if (new_state === grpc.connectivityState.READY) { callback(); } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { @@ -586,17 +584,7 @@ exports.makeClientConstructor = function(methods, serviceName) { self.channel.watchConnectivityState(new_state, deadline, checkState); } }; - checkState(null, {new_state: this.channel.getConnectivityState()}); - }; - - /** - * Attempt to connect to the server. That will happen automatically if - * you try to make a method call with this client, so this function should - * only be used if you want to know that you have a connection before making - * any calls. - */ - Client.prototype.$tryConnect = function() { - this.channel.getConnectivityState(true); + checkState(); }; _.each(methods, function(attrs, name) { diff --git a/test/surface_test.js b/test/surface_test.js index f002c8b6..b0f3aa4b 100644 --- a/test/surface_test.js +++ b/test/surface_test.js @@ -149,6 +149,12 @@ describe('Client#$waitForReady', function() { after(function() { server.shutdown(); }); + it('should complete when called alone', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); it('should complete when a call is initiated', function(done) { client.$waitForReady(Infinity, function(error) { assert.ifError(error); @@ -157,13 +163,6 @@ describe('Client#$waitForReady', function() { var call = client.div({}, function(err, response) {}); call.cancel(); }); - it('should complete if $tryConnect is called', function(done) { - client.$waitForReady(Infinity, function(error) { - assert.ifError(error); - done(); - }); - client.$tryConnect(); - }); it('should complete if called more than once', function(done) { done = multiDone(done, 2); client.$waitForReady(Infinity, function(error) { From cb408b39bc3e2e19688a7e0dc1387c74883bc681 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 13 Aug 2015 11:26:57 -0700 Subject: [PATCH 4/4] Further fixed connectivity tests --- src/client.js | 2 +- test/surface_test.js | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/client.js b/src/client.js index 0cd29e5b..d14713f3 100644 --- a/src/client.js +++ b/src/client.js @@ -575,7 +575,7 @@ exports.makeClientConstructor = function(methods, serviceName) { if (err) { callback(new Error('Failed to connect before the deadline')); } - var new_state = this.channel.getConnectivityState(true); + var new_state = self.channel.getConnectivityState(true); if (new_state === grpc.connectivityState.READY) { callback(); } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { diff --git a/test/surface_test.js b/test/surface_test.js index b0f3aa4b..098905e7 100644 --- a/test/surface_test.js +++ b/test/surface_test.js @@ -139,7 +139,7 @@ describe('Client#$waitForReady', function() { var client; before(function() { server = new grpc.Server(); - port = server.bind('localhost:0'); + port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure()); server.start(); Client = surface_client.makeProtobufClientConstructor(mathService); }); @@ -173,7 +173,6 @@ describe('Client#$waitForReady', function() { assert.ifError(error); done(); }); - client.$tryConnect(); }); it('should complete if called when already ready', function(done) { client.$waitForReady(Infinity, function(error) { @@ -183,7 +182,6 @@ describe('Client#$waitForReady', function() { done(); }); }); - client.$tryConnect(); }); }); describe('Echo service', function() {