mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
Merge pull request #2696 from murgatroid99/node_client_connectivity
Wrap connectivity API, expose it to client as waitForReady
This commit is contained in:
commit
f5fa08c907
@ -33,12 +33,17 @@
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "grpc/support/log.h"
|
||||
|
||||
#include <node.h>
|
||||
#include <nan.h>
|
||||
#include "grpc/grpc.h"
|
||||
#include "grpc/grpc_security.h"
|
||||
#include "call.h"
|
||||
#include "channel.h"
|
||||
#include "completion_queue_async_worker.h"
|
||||
#include "credentials.h"
|
||||
#include "timeval.h"
|
||||
|
||||
namespace grpc {
|
||||
namespace node {
|
||||
@ -51,6 +56,7 @@ using v8::Handle;
|
||||
using v8::HandleScope;
|
||||
using v8::Integer;
|
||||
using v8::Local;
|
||||
using v8::Number;
|
||||
using v8::Object;
|
||||
using v8::Persistent;
|
||||
using v8::String;
|
||||
@ -76,6 +82,12 @@ void Channel::Init(Handle<Object> exports) {
|
||||
NanNew<FunctionTemplate>(Close)->GetFunction());
|
||||
NanSetPrototypeTemplate(tpl, "getTarget",
|
||||
NanNew<FunctionTemplate>(GetTarget)->GetFunction());
|
||||
NanSetPrototypeTemplate(
|
||||
tpl, "getConnectivityState",
|
||||
NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction());
|
||||
NanSetPrototypeTemplate(
|
||||
tpl, "watchConnectivityState",
|
||||
NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction());
|
||||
NanAssignPersistent(fun_tpl, tpl);
|
||||
Handle<Function> ctr = tpl->GetFunction();
|
||||
constructor = new NanCallback(ctr);
|
||||
@ -186,5 +198,52 @@ NAN_METHOD(Channel::GetTarget) {
|
||||
NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel)));
|
||||
}
|
||||
|
||||
NAN_METHOD(Channel::GetConnectivityState) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError(
|
||||
"getConnectivityState can only be called on Channel objects");
|
||||
}
|
||||
Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
|
||||
int try_to_connect = (int)args[0]->Equals(NanTrue());
|
||||
NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel,
|
||||
try_to_connect));
|
||||
}
|
||||
|
||||
NAN_METHOD(Channel::WatchConnectivityState) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError(
|
||||
"watchConnectivityState can only be called on Channel objects");
|
||||
}
|
||||
if (!args[0]->IsUint32()) {
|
||||
return NanThrowTypeError(
|
||||
"watchConnectivityState's first argument must be a channel state");
|
||||
}
|
||||
if (!(args[1]->IsNumber() || args[1]->IsDate())) {
|
||||
return NanThrowTypeError(
|
||||
"watchConnectivityState's second argument must be a date or a number");
|
||||
}
|
||||
if (!args[2]->IsFunction()) {
|
||||
return NanThrowTypeError(
|
||||
"watchConnectivityState's third argument must be a callback");
|
||||
}
|
||||
grpc_connectivity_state last_state =
|
||||
static_cast<grpc_connectivity_state>(args[0]->Uint32Value());
|
||||
double deadline = args[1]->NumberValue();
|
||||
Handle<Function> callback_func = args[2].As<Function>();
|
||||
NanCallback *callback = new NanCallback(callback_func);
|
||||
Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
|
||||
unique_ptr<OpVec> ops(new OpVec());
|
||||
grpc_channel_watch_connectivity_state(
|
||||
channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline),
|
||||
CompletionQueueAsyncWorker::GetQueue(),
|
||||
new struct tag(callback,
|
||||
ops.release(),
|
||||
shared_ptr<Resources>(nullptr)));
|
||||
CompletionQueueAsyncWorker::Next();
|
||||
NanReturnUndefined();
|
||||
}
|
||||
|
||||
} // namespace node
|
||||
} // namespace grpc
|
||||
|
||||
@ -64,6 +64,8 @@ class Channel : public ::node::ObjectWrap {
|
||||
static NAN_METHOD(New);
|
||||
static NAN_METHOD(Close);
|
||||
static NAN_METHOD(GetTarget);
|
||||
static NAN_METHOD(GetConnectivityState);
|
||||
static NAN_METHOD(WatchConnectivityState);
|
||||
static NanCallback *constructor;
|
||||
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
|
||||
|
||||
|
||||
@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() {
|
||||
result =
|
||||
grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
|
||||
if (!result.success) {
|
||||
SetErrorMessage("The batch encountered an error");
|
||||
SetErrorMessage("The async function encountered an error");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle<Object> exports) {
|
||||
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
|
||||
}
|
||||
|
||||
void InitConnectivityStateConstants(Handle<Object> exports) {
|
||||
NanScope();
|
||||
Handle<Object> channel_state = NanNew<Object>();
|
||||
exports->Set(NanNew("connectivityState"), channel_state);
|
||||
Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE));
|
||||
channel_state->Set(NanNew("IDLE"), IDLE);
|
||||
Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING));
|
||||
channel_state->Set(NanNew("CONNECTING"), CONNECTING);
|
||||
Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY));
|
||||
channel_state->Set(NanNew("READY"), READY);
|
||||
Handle<Value> TRANSIENT_FAILURE(
|
||||
NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE));
|
||||
channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE);
|
||||
Handle<Value> FATAL_FAILURE(
|
||||
NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE));
|
||||
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
|
||||
}
|
||||
|
||||
void init(Handle<Object> exports) {
|
||||
NanScope();
|
||||
grpc_init();
|
||||
InitStatusConstants(exports);
|
||||
InitCallErrorConstants(exports);
|
||||
InitOpTypeConstants(exports);
|
||||
InitConnectivityStateConstants(exports);
|
||||
|
||||
grpc::node::Call::Init(exports);
|
||||
grpc::node::Channel::Init(exports);
|
||||
|
||||
@ -558,6 +558,35 @@ exports.makeClientConstructor = function(methods, serviceName) {
|
||||
this.updateMetadata = updateMetadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the client to be ready. The callback will be called when the
|
||||
* client has successfully connected to the server, and it will be called
|
||||
* with an error if the attempt to connect to the server has unrecoverablly
|
||||
* failed or if the deadline expires. This function will make the channel
|
||||
* start connecting if it has not already done so.
|
||||
* @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
|
||||
* Infinity to wait forever.
|
||||
* @param {function(Error)} callback The callback to call when done attempting
|
||||
* to connect.
|
||||
*/
|
||||
Client.prototype.$waitForReady = function(deadline, callback) {
|
||||
var self = this;
|
||||
var checkState = function(err) {
|
||||
if (err) {
|
||||
callback(new Error('Failed to connect before the deadline'));
|
||||
}
|
||||
var new_state = self.channel.getConnectivityState(true);
|
||||
if (new_state === grpc.connectivityState.READY) {
|
||||
callback();
|
||||
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
|
||||
callback(new Error('Failed to connect to server'));
|
||||
} else {
|
||||
self.channel.watchConnectivityState(new_state, deadline, checkState);
|
||||
}
|
||||
};
|
||||
checkState();
|
||||
};
|
||||
|
||||
_.each(methods, function(attrs, name) {
|
||||
var method_type;
|
||||
if (attrs.requestStream) {
|
||||
|
||||
@ -36,6 +36,26 @@
|
||||
var assert = require('assert');
|
||||
var grpc = require('bindings')('grpc.node');
|
||||
|
||||
/**
|
||||
* This is used for testing functions with multiple asynchronous calls that
|
||||
* can happen in different orders. This should be passed the number of async
|
||||
* function invocations that can occur last, and each of those should call this
|
||||
* function's return value
|
||||
* @param {function()} done The function that should be called when a test is
|
||||
* complete.
|
||||
* @param {number} count The number of calls to the resulting function if the
|
||||
* test passes.
|
||||
* @return {function()} The function that should be called at the end of each
|
||||
* sequence of asynchronous functions.
|
||||
*/
|
||||
function multiDone(done, count) {
|
||||
return function() {
|
||||
count -= 1;
|
||||
if (count <= 0) {
|
||||
done();
|
||||
}
|
||||
};
|
||||
}
|
||||
var insecureCreds = grpc.Credentials.createInsecure();
|
||||
|
||||
describe('channel', function() {
|
||||
@ -86,14 +106,16 @@ describe('channel', function() {
|
||||
});
|
||||
});
|
||||
describe('close', function() {
|
||||
var channel;
|
||||
beforeEach(function() {
|
||||
channel = new grpc.Channel('hostname', insecureCreds, {});
|
||||
});
|
||||
it('should succeed silently', function() {
|
||||
var channel = new grpc.Channel('hostname', insecureCreds, {});
|
||||
assert.doesNotThrow(function() {
|
||||
channel.close();
|
||||
});
|
||||
});
|
||||
it('should be idempotent', function() {
|
||||
var channel = new grpc.Channel('hostname', insecureCreds, {});
|
||||
assert.doesNotThrow(function() {
|
||||
channel.close();
|
||||
channel.close();
|
||||
@ -101,9 +123,68 @@ describe('channel', function() {
|
||||
});
|
||||
});
|
||||
describe('getTarget', function() {
|
||||
var channel;
|
||||
beforeEach(function() {
|
||||
channel = new grpc.Channel('hostname', insecureCreds, {});
|
||||
});
|
||||
it('should return a string', function() {
|
||||
var channel = new grpc.Channel('localhost', insecureCreds, {});
|
||||
assert.strictEqual(typeof channel.getTarget(), 'string');
|
||||
});
|
||||
});
|
||||
describe('getConnectivityState', function() {
|
||||
var channel;
|
||||
beforeEach(function() {
|
||||
channel = new grpc.Channel('hostname', insecureCreds, {});
|
||||
});
|
||||
it('should return IDLE for a new channel', function() {
|
||||
assert.strictEqual(channel.getConnectivityState(),
|
||||
grpc.connectivityState.IDLE);
|
||||
});
|
||||
});
|
||||
describe('watchConnectivityState', function() {
|
||||
var channel;
|
||||
beforeEach(function() {
|
||||
channel = new grpc.Channel('localhost', insecureCreds, {});
|
||||
});
|
||||
afterEach(function() {
|
||||
channel.close();
|
||||
});
|
||||
it('should time out if called alone', function(done) {
|
||||
var old_state = channel.getConnectivityState();
|
||||
var deadline = new Date();
|
||||
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||
channel.watchConnectivityState(old_state, deadline, function(err, value) {
|
||||
assert(err);
|
||||
done();
|
||||
});
|
||||
});
|
||||
it('should complete if a connection attempt is forced', function(done) {
|
||||
var old_state = channel.getConnectivityState();
|
||||
var deadline = new Date();
|
||||
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||
channel.watchConnectivityState(old_state, deadline, function(err, value) {
|
||||
assert.ifError(err);
|
||||
assert.notEqual(value.new_state, old_state);
|
||||
done();
|
||||
});
|
||||
channel.getConnectivityState(true);
|
||||
});
|
||||
it('should complete twice if called twice', function(done) {
|
||||
done = multiDone(done, 2);
|
||||
var old_state = channel.getConnectivityState();
|
||||
var deadline = new Date();
|
||||
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||
channel.watchConnectivityState(old_state, deadline, function(err, value) {
|
||||
assert.ifError(err);
|
||||
assert.notEqual(value.new_state, old_state);
|
||||
done();
|
||||
});
|
||||
channel.watchConnectivityState(old_state, deadline, function(err, value) {
|
||||
assert.ifError(err);
|
||||
assert.notEqual(value.new_state, old_state);
|
||||
done();
|
||||
});
|
||||
channel.getConnectivityState(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -78,6 +78,19 @@ var callErrorNames = [
|
||||
'INVALID_FLAGS'
|
||||
];
|
||||
|
||||
/**
|
||||
* List of all connectivity state names
|
||||
* @const
|
||||
* @type {Array.<string>}
|
||||
*/
|
||||
var connectivityStateNames = [
|
||||
'IDLE',
|
||||
'CONNECTING',
|
||||
'READY',
|
||||
'TRANSIENT_FAILURE',
|
||||
'FATAL_FAILURE'
|
||||
];
|
||||
|
||||
describe('constants', function() {
|
||||
it('should have all of the status constants', function() {
|
||||
for (var i = 0; i < statusNames.length; i++) {
|
||||
@ -91,4 +104,10 @@ describe('constants', function() {
|
||||
'call error missing: ' + callErrorNames[i]);
|
||||
}
|
||||
});
|
||||
it('should have all of the connectivity states', function() {
|
||||
for (var i = 0; i < connectivityStateNames.length; i++) {
|
||||
assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]),
|
||||
'connectivity status missing: ' + connectivityStateNames[i]);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@ -47,6 +47,26 @@ var mathService = math_proto.lookup('math.Math');
|
||||
|
||||
var _ = require('lodash');
|
||||
|
||||
/**
|
||||
* This is used for testing functions with multiple asynchronous calls that
|
||||
* can happen in different orders. This should be passed the number of async
|
||||
* function invocations that can occur last, and each of those should call this
|
||||
* function's return value
|
||||
* @param {function()} done The function that should be called when a test is
|
||||
* complete.
|
||||
* @param {number} count The number of calls to the resulting function if the
|
||||
* test passes.
|
||||
* @return {function()} The function that should be called at the end of each
|
||||
* sequence of asynchronous functions.
|
||||
*/
|
||||
function multiDone(done, count) {
|
||||
return function() {
|
||||
count -= 1;
|
||||
if (count <= 0) {
|
||||
done();
|
||||
}
|
||||
};
|
||||
}
|
||||
var server_insecure_creds = grpc.ServerCredentials.createInsecure();
|
||||
|
||||
describe('File loader', function() {
|
||||
@ -112,6 +132,58 @@ describe('Server.prototype.addProtoService', function() {
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('Client#$waitForReady', function() {
|
||||
var server;
|
||||
var port;
|
||||
var Client;
|
||||
var client;
|
||||
before(function() {
|
||||
server = new grpc.Server();
|
||||
port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure());
|
||||
server.start();
|
||||
Client = surface_client.makeProtobufClientConstructor(mathService);
|
||||
});
|
||||
beforeEach(function() {
|
||||
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
});
|
||||
it('should complete when called alone', function(done) {
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
done();
|
||||
});
|
||||
});
|
||||
it('should complete when a call is initiated', function(done) {
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
done();
|
||||
});
|
||||
var call = client.div({}, function(err, response) {});
|
||||
call.cancel();
|
||||
});
|
||||
it('should complete if called more than once', function(done) {
|
||||
done = multiDone(done, 2);
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
done();
|
||||
});
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
done();
|
||||
});
|
||||
});
|
||||
it('should complete if called when already ready', function(done) {
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
assert.ifError(error);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('Echo service', function() {
|
||||
var server;
|
||||
var client;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user