From 146c61aeec465de713c458b1d212c508f2d8ea31 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 22 Jul 2015 15:02:51 -0700 Subject: [PATCH 01/10] Add compression disabling without breaking anything else --- ext/call.cc | 7 +++++++ src/client.js | 22 ++++++++++++++++------ src/server.js | 22 ++++++++++++++-------- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 15c9b2d9..7d21b8b4 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -207,6 +207,13 @@ class SendMessageOp : public Op { if (!::node::Buffer::HasInstance(value)) { return false; } + Handle object_value = value->ToObject(); + if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) { + Handle flag_value = object_value->Get(NanNew("grpcWriteFlags")); + if (flag_value->IsUint32()) { + out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK; + } + } out->data.send_message = BufferToByteBuffer(value); Persistent *handle = new Persistent(); NanAssignPersistent(*handle, value); diff --git a/src/client.js b/src/client.js index b7bad949..de9efd7e 100644 --- a/src/client.js +++ b/src/client.js @@ -72,13 +72,15 @@ function ClientWritableStream(call, serialize) { * Attempt to write the given chunk. Calls the callback when done. This is an * implementation of a method needed for implementing stream.Writable. * @param {Buffer} chunk The chunk to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Called when the write is complete */ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + message.grpcWriteFlags = encoding; + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { // Something has gone wrong. Stop writing by failing to call callback @@ -207,9 +209,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. * @return {EventEmitter} An event emitter for stream related events */ - function makeUnaryRequest(argument, callback, metadata, deadline) { + function makeUnaryRequest(argument, callback, metadata, deadline, flags) { /* jshint validthis: true */ if (deadline === undefined) { deadline = Infinity; @@ -229,8 +233,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return; } var client_batch = {}; + var message = serialize(argument); + message.grpcWriteFlags = flags; client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; - client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_MESSAGE] = true; @@ -352,9 +358,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. * @return {EventEmitter} An event emitter for stream related events */ - function makeServerStreamRequest(argument, metadata, deadline) { + function makeServerStreamRequest(argument, metadata, deadline, flags) { /* jshint validthis: true */ if (deadline === undefined) { deadline = Infinity; @@ -371,9 +379,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { return; } var start_batch = {}; + var message = serialize(argument); + message.grpcWriteFlags = flags; start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { diff --git a/src/server.js b/src/server.js index 0a3a0031..776fafb9 100644 --- a/src/server.js +++ b/src/server.js @@ -107,8 +107,10 @@ function waitForCancel(call, emitter) { * @param {function(*):Buffer=} serialize Serialization function for the * response * @param {Object=} metadata Optional trailing metadata to send with status + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. */ -function sendUnaryResponse(call, value, serialize, metadata) { +function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; var status = { code: grpc.status.OK, @@ -122,7 +124,9 @@ function sendUnaryResponse(call, value, serialize, metadata) { end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; call.metadataSent = true; } - end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + var message = serialize(value); + message.grpcWriteFlags = flags; + end_batch[grpc.opType.SEND_MESSAGE] = message; end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); } @@ -243,7 +247,7 @@ function ServerWritableStream(call, serialize) { * Start writing a chunk of data. This is an implementation of a method required * for implementing stream.Writable. * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Callback to indicate that the write is * complete */ @@ -254,7 +258,9 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_INITIAL_METADATA] = {}; this.call.metadataSent = true; } - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + message.grpcWriteFlags = encoding; + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -411,14 +417,14 @@ function handleUnary(call, handler, metadata) { if (emitter.cancelled) { return; } - handler.func(emitter, function sendUnaryData(err, value, trailer) { + handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { if (err) { if (trailer) { err.metadata = trailer; } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); }); @@ -473,7 +479,7 @@ function handleClientStreaming(call, handler, metadata) { }); waitForCancel(call, stream); stream.metadata = metadata; - handler.func(stream, function(err, value, trailer) { + handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); if (err) { if (trailer) { @@ -481,7 +487,7 @@ function handleClientStreaming(call, handler, metadata) { } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); } From 0f7084ff7a06d490f1b086c76248835becb632b4 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 17 Aug 2015 11:36:03 -0700 Subject: [PATCH 02/10] Added an inline C++ function to replace a deprecated nan function --- ext/call.cc | 4 ++-- ext/call.h | 13 +++++++++++++ ext/server.cc | 2 +- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 705c80ff..1e76ea71 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -619,7 +619,7 @@ NAN_METHOD(Call::StartBatch) { call->wrapped_call, &ops[0], nops, new struct tag( callback, op_vector.release(), resources), NULL); if (error != GRPC_CALL_OK) { - return NanThrowError("startBatch failed", error); + return NanThrowError(nanErrorWithCode("startBatch failed", error)); } CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); @@ -633,7 +633,7 @@ NAN_METHOD(Call::Cancel) { Call *call = ObjectWrap::Unwrap(args.This()); grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { - return NanThrowError("cancel failed", error); + return NanThrowError(nanErrorWithCode("cancel failed", error)); } NanReturnUndefined(); } diff --git a/ext/call.h b/ext/call.h index 6acda761..ef6e5fcd 100644 --- a/ext/call.h +++ b/ext/call.h @@ -51,6 +51,19 @@ namespace node { using std::unique_ptr; using std::shared_ptr; +/** + * Helper function for throwing errors with a grpc_call_error value. + * Modified from the answer by Gus Goose to + * http://stackoverflow.com/questions/31794200. + */ +inline v8::Local nanErrorWithCode(const char *msg, + grpc_call_error code) { + NanEscapableScope(); + v8::Local err = NanError(msg).As(); + err->Set(NanNew("code"), NanNew(code)); + return NanEscapeScope(err); +} + v8::Handle ParseMetadata(const grpc_metadata_array *metadata_array); class PersistentHolder { diff --git a/ext/server.cc b/ext/server.cc index 8e396448..01217bce 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -235,7 +235,7 @@ NAN_METHOD(Server::RequestCall) { new struct tag(new NanCallback(args[0].As()), ops.release(), shared_ptr(nullptr))); if (error != GRPC_CALL_OK) { - return NanThrowError("requestCall failed", error); + return NanThrowError(nanErrorWithCode("requestCall failed", error)); } CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); From ff25af01630c8ba83dd9c532b22d223c303f71b7 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 17 Aug 2015 14:00:31 -0700 Subject: [PATCH 03/10] Moved write flag constants to base module --- ext/call.cc | 4 ---- ext/node_grpc.cc | 11 +++++++++++ index.js | 5 +++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 3256f41d..a720f9ee 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -464,10 +464,6 @@ void Call::Init(Handle exports) { NanNew(GetPeer)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle ctr = tpl->GetFunction(); - ctr->Set(NanNew("WRITE_BUFFER_HINT"), - NanNew(GRPC_WRITE_BUFFER_HINT)); - ctr->Set(NanNew("WRITE_NO_COMPRESS"), - NanNew(GRPC_WRITE_NO_COMPRESS)); exports->Set(NanNew("Call"), ctr); constructor = new NanCallback(ctr); } diff --git a/ext/node_grpc.cc b/ext/node_grpc.cc index d93dafda..0cf30da9 100644 --- a/ext/node_grpc.cc +++ b/ext/node_grpc.cc @@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle exports) { channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); } +void InitWriteFlags(Handle exports) { + NanScope(); + Handle write_flags = NanNew(); + exports->Set(NanNew("writeFlags"), write_flags); + Handle BUFFER_HINT(NanNew(GRPC_WRITE_BUFFER_HINT)); + write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT); + Handle NO_COMPRESS(NanNew(GRPC_WRITE_NO_COMPRESS)); + write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS); +} + void init(Handle exports) { NanScope(); grpc_init(); @@ -204,6 +214,7 @@ void init(Handle exports) { InitOpTypeConstants(exports); InitPropagateConstants(exports); InitConnectivityStateConstants(exports); + InitWriteFlags(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/index.js b/index.js index 93c65ac5..889b0ac0 100644 --- a/index.js +++ b/index.js @@ -144,6 +144,11 @@ exports.propagate = grpc.propagate; */ exports.callError = grpc.callError; +/** + * Write flag name to code number mapping + */ +exports.writeFlags = grpc.writeFlags; + /** * Credentials factories */ From ccb6cbe501f5acd6ad29fd8b7f5fb3a0796b3cae Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 17 Aug 2015 14:06:32 -0700 Subject: [PATCH 04/10] Only use encoding for write flags if it is numeric --- src/client.js | 6 +++++- src/server.js | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/client.js b/src/client.js index 3c3642ad..48fe0dd3 100644 --- a/src/client.js +++ b/src/client.js @@ -86,7 +86,11 @@ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; var message = this.serialize(chunk); - message.grpcWriteFlags = encoding; + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { diff --git a/src/server.js b/src/server.js index 3b3bab82..5037abae 100644 --- a/src/server.js +++ b/src/server.js @@ -270,7 +270,11 @@ function _write(chunk, encoding, callback) { this.call.metadataSent = true; } var message = this.serialize(chunk); - message.grpcWriteFlags = encoding; + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, value) { if (err) { From 6730adb9b0d10777a57db2f00bddfbf9c4bc7065 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 19 Aug 2015 08:20:06 -0700 Subject: [PATCH 05/10] update debian install instructions --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7d3d8c7f..61f4a01e 100644 --- a/README.md +++ b/README.md @@ -5,11 +5,10 @@ Alpha : Ready for early adopters ## PREREQUISITES - `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. ## INSTALLATION -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Node.js. +On Mac OS X, install [homebrew][]. Run the following command to install gRPC Node.js. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs ``` @@ -88,5 +87,4 @@ ServerCredentials An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install From 2ba89bc020ae9eef23d7448791403e191da6249f Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 19 Aug 2015 10:43:28 -0700 Subject: [PATCH 06/10] update installation instructions, review feedback --- README.md | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 61f4a01e..08ccedf7 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,26 @@ Alpha : Ready for early adopters - [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. ## INSTALLATION -On Mac OS X, install [homebrew][]. Run the following command to install gRPC Node.js. + +**Linux (Debian):** + +Add [debian unstable][] (sid) to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Node.js. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs ``` @@ -88,3 +107,4 @@ An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[debian unstable]:https://www.debian.org/releases/sid/ From 3bd3e4094f089c6f32daaaa82c5c484c05bea4a8 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 19 Aug 2015 14:59:11 -0700 Subject: [PATCH 07/10] Stop dereferencing an optional parameter without checking it --- src/client.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/client.js b/src/client.js index 48fe0dd3..7b7eae51 100644 --- a/src/client.js +++ b/src/client.js @@ -280,7 +280,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { } var client_batch = {}; var message = serialize(argument); - message.grpcWriteFlags = options.flags; + if (options) { + message.grpcWriteFlags = options.flags; + } client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -416,7 +418,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { } var start_batch = {}; var message = serialize(argument); - message.grpcWriteFlags = options.flags; + if (options) { + message.grpcWriteFlags = options.flags; + } start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.SEND_MESSAGE] = message; From d259f1600a42fafd266d148f58016bbb1b51e595 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 19 Aug 2015 15:59:59 -0700 Subject: [PATCH 08/10] update installation instructions, review feedback --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 08ccedf7..a945295f 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Alpha : Ready for early adopters **Linux (Debian):** -Add [debian unstable][] (sid) to your `sources.list` file. Example: +Add [Debian unstable][] to your `sources.list` file. Example: ```sh echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ @@ -107,4 +107,4 @@ An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install -[debian unstable]:https://www.debian.org/releases/sid/ +[Debian unstable]:https://www.debian.org/releases/sid/ From 925b2bdfb0b3746498e11aef4eb44f4940eed4d6 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 19 Aug 2015 16:32:39 -0700 Subject: [PATCH 09/10] update installation instructions, review feedback --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a945295f..b6411537 100644 --- a/README.md +++ b/README.md @@ -18,13 +18,19 @@ echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ sudo tee -a /etc/apt/sources.list ``` -Install the gRPC debian package +Install the gRPC Debian package ```sh sudo apt-get update sudo apt-get install libgrpc-dev ``` +Install the gRPC NPM package + +```sh +npm install grpc +``` + **Mac OS X** Install [homebrew][]. Run the following command to install gRPC Node.js. From d1d2ec50a6c2634d33585735720a961d7ff494fc Mon Sep 17 00:00:00 2001 From: yang-g Date: Fri, 21 Aug 2015 10:43:02 -0700 Subject: [PATCH 10/10] Update node health check service --- health_check/health.js | 10 +++------- health_check/health.proto | 5 ++--- test/health_test.js | 24 ++++++------------------ 3 files changed, 11 insertions(+), 28 deletions(-) diff --git a/health_check/health.js b/health_check/health.js index 87e00197..84d7e056 100644 --- a/health_check/health.js +++ b/health_check/health.js @@ -45,17 +45,13 @@ function HealthImplementation(statusMap) { this.statusMap = _.clone(statusMap); } -HealthImplementation.prototype.setStatus = function(host, service, status) { - if (!this.statusMap[host]) { - this.statusMap[host] = {}; - } - this.statusMap[host][service] = status; +HealthImplementation.prototype.setStatus = function(service, status) { + this.statusMap[service] = status; }; HealthImplementation.prototype.check = function(call, callback){ - var host = call.request.host; var service = call.request.service; - var status = _.get(this.statusMap, [host, service], null); + var status = _.get(this.statusMap, service, null); if (status === null) { callback({code:grpc.status.NOT_FOUND}); } else { diff --git a/health_check/health.proto b/health_check/health.proto index d31df1e0..57f4aaa9 100644 --- a/health_check/health.proto +++ b/health_check/health.proto @@ -32,8 +32,7 @@ syntax = "proto3"; package grpc.health.v1alpha; message HealthCheckRequest { - string host = 1; - string service = 2; + string service = 1; } message HealthCheckResponse { @@ -47,4 +46,4 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -} \ No newline at end of file +} diff --git a/test/health_test.js b/test/health_test.js index be4ef1d2..04959f5f 100644 --- a/test/health_test.js +++ b/test/health_test.js @@ -41,13 +41,9 @@ var grpc = require('../'); describe('Health Checking', function() { var statusMap = { - '': { - '': 'SERVING', - 'grpc.test.TestService': 'NOT_SERVING', - }, - virtual_host: { - 'grpc.test.TestService': 'SERVING' - } + '': 'SERVING', + 'grpc.test.TestServiceNotServing': 'NOT_SERVING', + 'grpc.test.TestServiceServing': 'SERVING' }; var healthServer = new grpc.Server(); healthServer.addProtoService(health.service, @@ -71,15 +67,15 @@ describe('Health Checking', function() { }); }); it('should say that a disabled service is NOT_SERVING', function(done) { - healthClient.check({service: 'grpc.test.TestService'}, + healthClient.check({service: 'grpc.test.TestServiceNotServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'NOT_SERVING'); done(); }); }); - it('should say that a service on another host is SERVING', function(done) { - healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'}, + it('should say that an enabled service is SERVING', function(done) { + healthClient.check({service: 'grpc.test.TestServiceServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'SERVING'); @@ -93,12 +89,4 @@ describe('Health Checking', function() { done(); }); }); - it('should get NOT_FOUND if the host is not registered', function(done) { - healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'}, - function(err, response) { - assert(err); - assert.strictEqual(err.code, grpc.status.NOT_FOUND); - done(); - }); - }); });