diff --git a/README.md b/README.md index 7d3d8c7f..b6411537 100644 --- a/README.md +++ b/README.md @@ -5,11 +5,35 @@ Alpha : Ready for early adopters ## PREREQUISITES - `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. ## INSTALLATION -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Node.js. + +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC NPM package + +```sh +npm install grpc +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Node.js. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs ``` @@ -88,5 +112,5 @@ ServerCredentials An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/ext/call.cc b/ext/call.cc index a79a4742..18858fa3 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -207,6 +207,13 @@ class SendMessageOp : public Op { if (!::node::Buffer::HasInstance(value)) { return false; } + Handle object_value = value->ToObject(); + if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) { + Handle flag_value = object_value->Get(NanNew("grpcWriteFlags")); + if (flag_value->IsUint32()) { + out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK; + } + } out->data.send_message = BufferToByteBuffer(value); Persistent *handle = new Persistent(); NanAssignPersistent(*handle, value); @@ -457,10 +464,6 @@ void Call::Init(Handle exports) { NanNew(GetPeer)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle ctr = tpl->GetFunction(); - ctr->Set(NanNew("WRITE_BUFFER_HINT"), - NanNew(GRPC_WRITE_BUFFER_HINT)); - ctr->Set(NanNew("WRITE_NO_COMPRESS"), - NanNew(GRPC_WRITE_NO_COMPRESS)); exports->Set(NanNew("Call"), ctr); constructor = new NanCallback(ctr); } @@ -620,7 +623,7 @@ NAN_METHOD(Call::StartBatch) { call->wrapped_call, &ops[0], nops, new struct tag( callback, op_vector.release(), resources), NULL); if (error != GRPC_CALL_OK) { - return NanThrowError("startBatch failed", error); + return NanThrowError(nanErrorWithCode("startBatch failed", error)); } CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); @@ -634,7 +637,7 @@ NAN_METHOD(Call::Cancel) { Call *call = ObjectWrap::Unwrap(args.This()); grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { - return NanThrowError("cancel failed", error); + return NanThrowError(nanErrorWithCode("cancel failed", error)); } NanReturnUndefined(); } diff --git a/ext/call.h b/ext/call.h index 6acda761..ef6e5fcd 100644 --- a/ext/call.h +++ b/ext/call.h @@ -51,6 +51,19 @@ namespace node { using std::unique_ptr; using std::shared_ptr; +/** + * Helper function for throwing errors with a grpc_call_error value. + * Modified from the answer by Gus Goose to + * http://stackoverflow.com/questions/31794200. + */ +inline v8::Local nanErrorWithCode(const char *msg, + grpc_call_error code) { + NanEscapableScope(); + v8::Local err = NanError(msg).As(); + err->Set(NanNew("code"), NanNew(code)); + return NanEscapeScope(err); +} + v8::Handle ParseMetadata(const grpc_metadata_array *metadata_array); class PersistentHolder { diff --git a/ext/node_grpc.cc b/ext/node_grpc.cc index d93dafda..0cf30da9 100644 --- a/ext/node_grpc.cc +++ b/ext/node_grpc.cc @@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle exports) { channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); } +void InitWriteFlags(Handle exports) { + NanScope(); + Handle write_flags = NanNew(); + exports->Set(NanNew("writeFlags"), write_flags); + Handle BUFFER_HINT(NanNew(GRPC_WRITE_BUFFER_HINT)); + write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT); + Handle NO_COMPRESS(NanNew(GRPC_WRITE_NO_COMPRESS)); + write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS); +} + void init(Handle exports) { NanScope(); grpc_init(); @@ -204,6 +214,7 @@ void init(Handle exports) { InitOpTypeConstants(exports); InitPropagateConstants(exports); InitConnectivityStateConstants(exports); + InitWriteFlags(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/ext/server.cc b/ext/server.cc index 57c43104..32a8ff55 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -233,7 +233,7 @@ NAN_METHOD(Server::RequestCall) { new struct tag(new NanCallback(args[0].As()), ops.release(), shared_ptr(nullptr))); if (error != GRPC_CALL_OK) { - return NanThrowError("requestCall failed", error); + return NanThrowError(nanErrorWithCode("requestCall failed", error)); } CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); diff --git a/health_check/health.js b/health_check/health.js index 87e00197..84d7e056 100644 --- a/health_check/health.js +++ b/health_check/health.js @@ -45,17 +45,13 @@ function HealthImplementation(statusMap) { this.statusMap = _.clone(statusMap); } -HealthImplementation.prototype.setStatus = function(host, service, status) { - if (!this.statusMap[host]) { - this.statusMap[host] = {}; - } - this.statusMap[host][service] = status; +HealthImplementation.prototype.setStatus = function(service, status) { + this.statusMap[service] = status; }; HealthImplementation.prototype.check = function(call, callback){ - var host = call.request.host; var service = call.request.service; - var status = _.get(this.statusMap, [host, service], null); + var status = _.get(this.statusMap, service, null); if (status === null) { callback({code:grpc.status.NOT_FOUND}); } else { diff --git a/health_check/health.proto b/health_check/health.proto index d31df1e0..57f4aaa9 100644 --- a/health_check/health.proto +++ b/health_check/health.proto @@ -32,8 +32,7 @@ syntax = "proto3"; package grpc.health.v1alpha; message HealthCheckRequest { - string host = 1; - string service = 2; + string service = 1; } message HealthCheckResponse { @@ -47,4 +46,4 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -} \ No newline at end of file +} diff --git a/index.js b/index.js index 93c65ac5..889b0ac0 100644 --- a/index.js +++ b/index.js @@ -144,6 +144,11 @@ exports.propagate = grpc.propagate; */ exports.callError = grpc.callError; +/** + * Write flag name to code number mapping + */ +exports.writeFlags = grpc.writeFlags; + /** * Credentials factories */ diff --git a/src/client.js b/src/client.js index 50cbf4a1..7b7eae51 100644 --- a/src/client.js +++ b/src/client.js @@ -79,13 +79,19 @@ function ClientWritableStream(call, serialize) { * implementation of a method needed for implementing stream.Writable. * @access private * @param {Buffer} chunk The chunk to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Called when the write is complete */ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { // Something has gone wrong. Stop writing by failing to call callback @@ -273,8 +279,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return; } var client_batch = {}; + var message = serialize(argument); + if (options) { + message.grpcWriteFlags = options.flags; + } client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; - client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_MESSAGE] = true; @@ -407,9 +417,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { return; } var start_batch = {}; + var message = serialize(argument); + if (options) { + message.grpcWriteFlags = options.flags; + } start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { diff --git a/src/server.js b/src/server.js index f2520c3c..137f60ed 100644 --- a/src/server.js +++ b/src/server.js @@ -115,8 +115,10 @@ function waitForCancel(call, emitter) { * @param {function(*):Buffer=} serialize Serialization function for the * response * @param {Object=} metadata Optional trailing metadata to send with status + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. */ -function sendUnaryResponse(call, value, serialize, metadata) { +function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; var status = { code: grpc.status.OK, @@ -130,7 +132,9 @@ function sendUnaryResponse(call, value, serialize, metadata) { end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; call.metadataSent = true; } - end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + var message = serialize(value); + message.grpcWriteFlags = flags; + end_batch[grpc.opType.SEND_MESSAGE] = message; end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); } @@ -254,7 +258,7 @@ function ServerWritableStream(call, serialize) { * for implementing stream.Writable. * @access private * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Callback to indicate that the write is * complete */ @@ -265,7 +269,13 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_INITIAL_METADATA] = {}; this.call.metadataSent = true; } - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -450,14 +460,14 @@ function handleUnary(call, handler, metadata) { if (emitter.cancelled) { return; } - handler.func(emitter, function sendUnaryData(err, value, trailer) { + handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { if (err) { if (trailer) { err.metadata = trailer; } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); }); @@ -514,7 +524,7 @@ function handleClientStreaming(call, handler, metadata) { }); waitForCancel(call, stream); stream.metadata = metadata; - handler.func(stream, function(err, value, trailer) { + handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); if (err) { if (trailer) { @@ -522,7 +532,7 @@ function handleClientStreaming(call, handler, metadata) { } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); } diff --git a/test/health_test.js b/test/health_test.js index 22c58d39..9267bff7 100644 --- a/test/health_test.js +++ b/test/health_test.js @@ -41,13 +41,9 @@ var grpc = require('../'); describe('Health Checking', function() { var statusMap = { - '': { - '': 'SERVING', - 'grpc.test.TestService': 'NOT_SERVING', - }, - virtual_host: { - 'grpc.test.TestService': 'SERVING' - } + '': 'SERVING', + 'grpc.test.TestServiceNotServing': 'NOT_SERVING', + 'grpc.test.TestServiceServing': 'SERVING' }; var healthServer = new grpc.Server(); healthServer.addProtoService(health.service, @@ -71,15 +67,15 @@ describe('Health Checking', function() { }); }); it('should say that a disabled service is NOT_SERVING', function(done) { - healthClient.check({service: 'grpc.test.TestService'}, + healthClient.check({service: 'grpc.test.TestServiceNotServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'NOT_SERVING'); done(); }); }); - it('should say that a service on another host is SERVING', function(done) { - healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'}, + it('should say that an enabled service is SERVING', function(done) { + healthClient.check({service: 'grpc.test.TestServiceServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'SERVING'); @@ -93,12 +89,4 @@ describe('Health Checking', function() { done(); }); }); - it('should get NOT_FOUND if the host is not registered', function(done) { - healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'}, - function(err, response) { - assert(err); - assert.strictEqual(err.code, grpc.status.NOT_FOUND); - done(); - }); - }); });