Merge branch 'master' into node_server_graceful_shutdown

This commit is contained in:
murgatroid99 2015-08-21 14:26:13 -07:00
commit 02360ea4f7
11 changed files with 114 additions and 51 deletions

View File

@ -5,11 +5,35 @@ Alpha : Ready for early adopters
## PREREQUISITES
- `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
## INSTALLATION
On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
Run the following command to install gRPC Node.js.
**Linux (Debian):**
Add [Debian unstable][] to your `sources.list` file. Example:
```sh
echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
sudo tee -a /etc/apt/sources.list
```
Install the gRPC Debian package
```sh
sudo apt-get update
sudo apt-get install libgrpc-dev
```
Install the gRPC NPM package
```sh
npm install grpc
```
**Mac OS X**
Install [homebrew][]. Run the following command to install gRPC Node.js.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs
```
@ -88,5 +112,5 @@ ServerCredentials
An object with factory methods for creating credential objects for servers.
[homebrew]:http://brew.sh
[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Debian unstable]:https://www.debian.org/releases/sid/

View File

@ -207,6 +207,13 @@ class SendMessageOp : public Op {
if (!::node::Buffer::HasInstance(value)) {
return false;
}
Handle<Object> object_value = value->ToObject();
if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) {
Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags"));
if (flag_value->IsUint32()) {
out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK;
}
}
out->data.send_message = BufferToByteBuffer(value);
Persistent<Value> *handle = new Persistent<Value>();
NanAssignPersistent(*handle, value);
@ -457,10 +464,6 @@ void Call::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(GetPeer)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
ctr->Set(NanNew("WRITE_BUFFER_HINT"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
ctr->Set(NanNew("WRITE_NO_COMPRESS"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
exports->Set(NanNew("Call"), ctr);
constructor = new NanCallback(ctr);
}
@ -620,7 +623,7 @@ NAN_METHOD(Call::StartBatch) {
call->wrapped_call, &ops[0], nops, new struct tag(
callback, op_vector.release(), resources), NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("startBatch failed", error);
return NanThrowError(nanErrorWithCode("startBatch failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
@ -634,7 +637,7 @@ NAN_METHOD(Call::Cancel) {
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("cancel failed", error);
return NanThrowError(nanErrorWithCode("cancel failed", error));
}
NanReturnUndefined();
}

View File

@ -51,6 +51,19 @@ namespace node {
using std::unique_ptr;
using std::shared_ptr;
/**
* Helper function for throwing errors with a grpc_call_error value.
* Modified from the answer by Gus Goose to
* http://stackoverflow.com/questions/31794200.
*/
inline v8::Local<v8::Value> nanErrorWithCode(const char *msg,
grpc_call_error code) {
NanEscapableScope();
v8::Local<v8::Object> err = NanError(msg).As<v8::Object>();
err->Set(NanNew("code"), NanNew<v8::Uint32>(code));
return NanEscapeScope(err);
}
v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
class PersistentHolder {

View File

@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle<Object> exports) {
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
}
void InitWriteFlags(Handle<Object> exports) {
NanScope();
Handle<Object> write_flags = NanNew<Object>();
exports->Set(NanNew("writeFlags"), write_flags);
Handle<Value> BUFFER_HINT(NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT);
Handle<Value> NO_COMPRESS(NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS);
}
void init(Handle<Object> exports) {
NanScope();
grpc_init();
@ -204,6 +214,7 @@ void init(Handle<Object> exports) {
InitOpTypeConstants(exports);
InitPropagateConstants(exports);
InitConnectivityStateConstants(exports);
InitWriteFlags(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);

View File

@ -233,7 +233,7 @@ NAN_METHOD(Server::RequestCall) {
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
if (error != GRPC_CALL_OK) {
return NanThrowError("requestCall failed", error);
return NanThrowError(nanErrorWithCode("requestCall failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();

View File

@ -45,17 +45,13 @@ function HealthImplementation(statusMap) {
this.statusMap = _.clone(statusMap);
}
HealthImplementation.prototype.setStatus = function(host, service, status) {
if (!this.statusMap[host]) {
this.statusMap[host] = {};
}
this.statusMap[host][service] = status;
HealthImplementation.prototype.setStatus = function(service, status) {
this.statusMap[service] = status;
};
HealthImplementation.prototype.check = function(call, callback){
var host = call.request.host;
var service = call.request.service;
var status = _.get(this.statusMap, [host, service], null);
var status = _.get(this.statusMap, service, null);
if (status === null) {
callback({code:grpc.status.NOT_FOUND});
} else {

View File

@ -32,8 +32,7 @@ syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
string host = 1;
string service = 2;
string service = 1;
}
message HealthCheckResponse {
@ -47,4 +46,4 @@ message HealthCheckResponse {
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}
}

View File

@ -144,6 +144,11 @@ exports.propagate = grpc.propagate;
*/
exports.callError = grpc.callError;
/**
* Write flag name to code number mapping
*/
exports.writeFlags = grpc.writeFlags;
/**
* Credentials factories
*/

View File

@ -79,13 +79,19 @@ function ClientWritableStream(call, serialize) {
* implementation of a method needed for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk to write
* @param {string} encoding Ignored
* @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
var message = this.serialize(chunk);
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
message.grpcWriteFlags = encoding;
}
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
// Something has gone wrong. Stop writing by failing to call callback
@ -273,8 +279,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
var client_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
@ -407,9 +417,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {

View File

@ -115,8 +115,10 @@ function waitForCancel(call, emitter) {
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
* @param {number=} flags Flags for modifying how the message is sent.
* Defaults to 0.
*/
function sendUnaryResponse(call, value, serialize, metadata) {
function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var status = {
code: grpc.status.OK,
@ -130,7 +132,9 @@ function sendUnaryResponse(call, value, serialize, metadata) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
var message = serialize(value);
message.grpcWriteFlags = flags;
end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@ -254,7 +258,7 @@ function ServerWritableStream(call, serialize) {
* for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk of data to write
* @param {string} encoding Ignored
* @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
@ -265,7 +269,13 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
var message = this.serialize(chunk);
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
message.grpcWriteFlags = encoding;
}
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, value) {
if (err) {
this.emit('error', err);
@ -450,14 +460,14 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
handler.func(emitter, function sendUnaryData(err, value, trailer) {
handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
if (err) {
if (trailer) {
err.metadata = trailer;
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
});
@ -514,7 +524,7 @@ function handleClientStreaming(call, handler, metadata) {
});
waitForCancel(call, stream);
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
if (trailer) {
@ -522,7 +532,7 @@ function handleClientStreaming(call, handler, metadata) {
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
}

View File

@ -41,13 +41,9 @@ var grpc = require('../');
describe('Health Checking', function() {
var statusMap = {
'': {
'': 'SERVING',
'grpc.test.TestService': 'NOT_SERVING',
},
virtual_host: {
'grpc.test.TestService': 'SERVING'
}
'': 'SERVING',
'grpc.test.TestServiceNotServing': 'NOT_SERVING',
'grpc.test.TestServiceServing': 'SERVING'
};
var healthServer = new grpc.Server();
healthServer.addProtoService(health.service,
@ -71,15 +67,15 @@ describe('Health Checking', function() {
});
});
it('should say that a disabled service is NOT_SERVING', function(done) {
healthClient.check({service: 'grpc.test.TestService'},
healthClient.check({service: 'grpc.test.TestServiceNotServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'NOT_SERVING');
done();
});
});
it('should say that a service on another host is SERVING', function(done) {
healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'},
it('should say that an enabled service is SERVING', function(done) {
healthClient.check({service: 'grpc.test.TestServiceServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'SERVING');
@ -93,12 +89,4 @@ describe('Health Checking', function() {
done();
});
});
it('should get NOT_FOUND if the host is not registered', function(done) {
healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'},
function(err, response) {
assert(err);
assert.strictEqual(err.code, grpc.status.NOT_FOUND);
done();
});
});
});