diff --git a/binding.gyp b/binding.gyp index fe4b5da9..cf2a6acb 100644 --- a/binding.gyp +++ b/binding.gyp @@ -28,17 +28,17 @@ }, "target_name": "grpc", "sources": [ - "byte_buffer.cc", - "call.cc", - "channel.cc", - "completion_queue_async_worker.cc", - "credentials.cc", - "event.cc", - "node_grpc.cc", - "server.cc", - "server_credentials.cc", - "tag.cc", - "timeval.cc" + "ext/byte_buffer.cc", + "ext/call.cc", + "ext/channel.cc", + "ext/completion_queue_async_worker.cc", + "ext/credentials.cc", + "ext/event.cc", + "ext/node_grpc.cc", + "ext/server.cc", + "ext/server_credentials.cc", + "ext/tag.cc", + "ext/timeval.cc" ], 'conditions' : [ ['no_install=="yes"', { diff --git a/examples/math_server.js b/examples/math_server.js index d649b4fd..e65cfe30 100644 --- a/examples/math_server.js +++ b/examples/math_server.js @@ -52,7 +52,8 @@ var Server = grpc.buildServer([math.Math.service]); */ function mathDiv(call, cb) { var req = call.request; - if (req.divisor == 0) { + // Unary + is explicit coersion to integer + if (+req.divisor === 0) { cb(new Error('cannot divide by zero')); } cb(null, { @@ -89,7 +90,7 @@ function mathSum(call, cb) { // Here, call is a standard readable Node object Stream var sum = 0; call.on('data', function(data) { - sum += data.num | 0; + sum += (+data.num); }); call.on('end', function() { cb(null, {num: sum}); @@ -104,7 +105,7 @@ function mathDivMany(stream) { Transform.call(this, options); } DivTransform.prototype._transform = function(div_args, encoding, callback) { - if (div_args.divisor == 0) { + if (+div_args.divisor === 0) { callback(new Error('cannot divide by zero')); } callback(null, { diff --git a/byte_buffer.cc b/ext/byte_buffer.cc similarity index 100% rename from byte_buffer.cc rename to ext/byte_buffer.cc diff --git a/byte_buffer.h b/ext/byte_buffer.h similarity index 100% rename from byte_buffer.h rename to ext/byte_buffer.h diff --git a/call.cc b/ext/call.cc similarity index 100% rename from call.cc rename to ext/call.cc diff --git a/call.h b/ext/call.h similarity index 100% rename from call.h rename to ext/call.h diff --git a/channel.cc b/ext/channel.cc similarity index 100% rename from channel.cc rename to ext/channel.cc diff --git a/channel.h b/ext/channel.h similarity index 100% rename from channel.h rename to ext/channel.h diff --git a/completion_queue_async_worker.cc b/ext/completion_queue_async_worker.cc similarity index 100% rename from completion_queue_async_worker.cc rename to ext/completion_queue_async_worker.cc diff --git a/completion_queue_async_worker.h b/ext/completion_queue_async_worker.h similarity index 100% rename from completion_queue_async_worker.h rename to ext/completion_queue_async_worker.h diff --git a/credentials.cc b/ext/credentials.cc similarity index 100% rename from credentials.cc rename to ext/credentials.cc diff --git a/credentials.h b/ext/credentials.h similarity index 100% rename from credentials.h rename to ext/credentials.h diff --git a/event.cc b/ext/event.cc similarity index 100% rename from event.cc rename to ext/event.cc diff --git a/event.h b/ext/event.h similarity index 100% rename from event.h rename to ext/event.h diff --git a/node_grpc.cc b/ext/node_grpc.cc similarity index 100% rename from node_grpc.cc rename to ext/node_grpc.cc diff --git a/server.cc b/ext/server.cc similarity index 100% rename from server.cc rename to ext/server.cc diff --git a/server.h b/ext/server.h similarity index 100% rename from server.h rename to ext/server.h diff --git a/server_credentials.cc b/ext/server_credentials.cc similarity index 100% rename from server_credentials.cc rename to ext/server_credentials.cc diff --git a/server_credentials.h b/ext/server_credentials.h similarity index 100% rename from server_credentials.h rename to ext/server_credentials.h diff --git a/tag.cc b/ext/tag.cc similarity index 100% rename from tag.cc rename to ext/tag.cc diff --git a/tag.h b/ext/tag.h similarity index 100% rename from tag.h rename to ext/tag.h diff --git a/timeval.cc b/ext/timeval.cc similarity index 100% rename from timeval.cc rename to ext/timeval.cc diff --git a/timeval.h b/ext/timeval.h similarity index 100% rename from timeval.h rename to ext/timeval.h diff --git a/main.js b/index.js similarity index 96% rename from main.js rename to index.js index 751c3525..0627e7f5 100644 --- a/main.js +++ b/index.js @@ -35,9 +35,9 @@ var _ = require('underscore'); var ProtoBuf = require('protobufjs'); -var surface_client = require('./surface_client.js'); +var surface_client = require('./src/surface_client.js'); -var surface_server = require('./surface_server.js'); +var surface_server = require('./src/surface_server.js'); var grpc = require('bindings')('grpc'); diff --git a/interop/interop_client.js b/interop/interop_client.js index cf75b9a7..9306317b 100644 --- a/interop/interop_client.js +++ b/interop/interop_client.js @@ -183,7 +183,7 @@ function pingPong(client, done) { assert.equal(response.payload.body.limit - response.payload.body.offset, response_sizes[index]); index += 1; - if (index == 4) { + if (index === 4) { call.end(); } else { call.write({ diff --git a/package.json b/package.json index 5f3c6fa3..8a0b51dd 100644 --- a/package.json +++ b/package.json @@ -17,5 +17,5 @@ "mocha": "~1.21.0", "minimist": "^1.1.0" }, - "main": "main.js" + "main": "index.js" } diff --git a/client.js b/src/client.js similarity index 94% rename from client.js rename to src/client.js index 2fefd14b..3a1c9eef 100644 --- a/client.js +++ b/src/client.js @@ -105,7 +105,7 @@ function GrpcClientStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,11 +155,19 @@ GrpcClientStream.prototype._read = function(size) { */ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; +/** + * Cancel the ongoing call. If the call has not already finished, it will finish + * with status CANCELLED. + */ +GrpcClientStream.prototype.cancel = function() { + this._call.cancel(); +}; + /** * Make a request on the channel to the given method with the given arguments * @param {grpc.Channel} channel The channel on which to make the request @@ -185,7 +193,7 @@ function makeRequest(channel, if (metadata) { call.addMetadata(metadata); } - return new GrpcClientStream(call); + return new GrpcClientStream(call, serialize, deserialize); } /** diff --git a/common.js b/src/common.js similarity index 100% rename from common.js rename to src/common.js diff --git a/server.js b/src/server.js similarity index 98% rename from server.js rename to src/server.js index eca20aa5..03cdbe6f 100644 --- a/server.js +++ b/src/server.js @@ -151,7 +151,7 @@ function GrpcServerStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(deserialize(data)) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -233,7 +233,7 @@ function Server(options) { function handleNewCall(event) { var call = event.call; var data = event.data; - if (data == null) { + if (data === null) { return; } server.requestCall(handleNewCall); @@ -246,6 +246,7 @@ function Server(options) { call.serverAccept(function(event) { if (event.data.code === grpc.status.CANCELLED) { cancelled = true; + stream.emit('cancelled'); } }, 0); call.serverEndInitialMetadata(0); diff --git a/surface_client.js b/src/surface_client.js similarity index 79% rename from surface_client.js rename to src/surface_client.js index 996e3d10..16c31809 100644 --- a/surface_client.js +++ b/src/surface_client.js @@ -63,80 +63,25 @@ util.inherits(ClientReadableObjectStream, Readable); * client side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); this._stream.pause(); } -util.inherits(ClientWritableObjectStream, Writable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options - */ -function ClientWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); - Writable.call(this, options); - this._stream = stream; - this._serialize = serialize; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); -} - - -util.inherits(ClientBidiObjectStream, Duplex); - -/** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * client side. Extends from stream.Duplex. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options - */ -function ClientBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); - Duplex.call(this, options); - this._stream = stream; - this._serialize = serialize; - var self = this; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { - self._stream.pause(); - } - }); - this._stream.pause(); - this.on('finish', function() { - this._stream.end(); - }); -} - /** * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream|ClientBidiObjectStream} + * @this {ClientReadableObjectStream} * @param {number} size Ignored */ function _read(size) { @@ -147,30 +92,51 @@ function _read(size) { * See docs for _read */ ClientReadableObjectStream.prototype._read = _read; + +util.inherits(ClientWritableObjectStream, Writable); + /** - * See docs for _read + * Class for representing a gRPC client streaming call as a Node stream on the + * client side. Extends from stream.Writable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call */ -ClientBidiObjectStream.prototype._read = _read; +function ClientWritableObjectStream(stream) { + var options = {objectMode: true}; + Writable.call(this, options); + this._stream = stream; + forwardEvent(stream, this, 'status'); + forwardEvent(stream, this, 'metadata'); + this.on('finish', function() { + this._stream.end(); + }); +} /** * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream|ClientBidiObjectStream} + * @this {ClientWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** * See docs for _write */ ClientWritableObjectStream.prototype._write = _write; + /** - * See docs for _write + * Cancel the underlying call */ -ClientBidiObjectStream.prototype._write = _write; +function cancel() { + this._stream.cancel(); +} + +ClientReadableObjectStream.prototype.cancel = cancel; +ClientWritableObjectStream.prototype.cancel = cancel; /** * Get a function that can make unary requests to the specified method. @@ -196,19 +162,28 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); var emitter = new EventEmitter(); + emitter.cancel = function cancel() { + stream.cancel(); + }; forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); - stream.write(serialize(argument)); + stream.write(argument); stream.end(); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return emitter; } return makeUnaryRequest; @@ -236,15 +211,21 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream, serialize, {}); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientWritableObjectStream(stream); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return obj_stream; } return makeClientStreamRequest; @@ -272,9 +253,10 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream, deserialize, {}); - stream.write(serialize(argument)); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientReadableObjectStream(stream); + stream.write(argument); stream.end(); return obj_stream; } @@ -301,12 +283,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeBidiStreamRequest(metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientBidiObjectStream(stream, - serialize, - deserialize, - {}); - return obj_stream; + return client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); } return makeBidiStreamRequest; } diff --git a/surface_server.js b/src/surface_server.js similarity index 71% rename from surface_server.js rename to src/surface_server.js index bc688839..af23ec21 100644 --- a/surface_server.js +++ b/src/surface_server.js @@ -54,19 +54,20 @@ util.inherits(ServerReadableObjectStream, Readable); * server side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ServerReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ServerReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; Object.defineProperty(this, 'cancelled', { get: function() { return stream.cancelled; } }); var self = this; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -76,57 +77,6 @@ function ServerReadableObjectStream(stream, deserialize, options) { this._stream.pause(); } -util.inherits(ServerWritableObjectStream, Writable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * server side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options - */ -function ServerWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); - Writable.call(this, options); - this._stream = stream; - this._serialize = serialize; - this.on('finish', function() { - this._stream.end(); - }); -} - -util.inherits(ServerBidiObjectStream, Duplex); - -/** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * server side. Extends from stream.Duplex. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options - */ -function ServerBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); - Duplex.call(this, options); - this._stream = stream; - this._serialize = serialize; - var self = this; - this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); - this.on('finish', function() { - this._stream.end(); - }); -} - /** * _read implementation for both types of streams that allow reading. * @this {ServerReadableObjectStream|ServerBidiObjectStream} @@ -140,39 +90,49 @@ function _read(size) { * See docs for _read */ ServerReadableObjectStream.prototype._read = _read; + +util.inherits(ServerWritableObjectStream, Writable); + /** - * See docs for _read + * Class for representing a gRPC server streaming call as a Node stream on the + * server side. Extends from stream.Writable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call */ -ServerBidiObjectStream.prototype._read = _read; +function ServerWritableObjectStream(stream) { + var options = {objectMode: true}; + Writable.call(this, options); + this._stream = stream; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); + this.on('finish', function() { + this._stream.end(); + }); +} /** * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream|ServerBidiObjectStream} + * @this {ServerWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** * See docs for _write */ ServerWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ServerBidiObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function * @param {function(Object, function(Error, *))} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeUnaryHandler(handler, serialize, deserialize) { +function makeUnaryHandler(handler) { /** * Handles a stream by reading a single data value, passing it to the handler, * and writing the response back to the stream. @@ -180,15 +140,18 @@ function makeUnaryHandler(handler, serialize, deserialize) { */ return function handleUnaryCall(stream) { stream.on('data', function handleUnaryData(value) { - var call = {request: deserialize(value)}; + var call = {request: value}; Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); + stream.on('cancelled', function() { + call.emit('cancelled'); + }); handler(call, function sendUnaryData(err, value) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -201,23 +164,21 @@ function makeUnaryHandler(handler, serialize, deserialize) { * function * @param {function(Readable, function(Error, *))} handler Client stream call * handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeClientStreamHandler(handler, serialize, deserialize) { +function makeClientStreamHandler(handler) { /** * Handles a stream by passing a deserializing stream to the handler and * writing the response back to the stream. * @param {stream} stream Binary data stream */ return function handleClientStreamCall(stream) { - var object_stream = new ServerReadableObjectStream(stream, deserialize, {}); + var object_stream = new ServerReadableObjectStream(stream); handler(object_stream, function sendClientStreamData(err, value) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -228,11 +189,9 @@ function makeClientStreamHandler(handler, serialize, deserialize) { * Creates a binary stream handler function from a server stream handler * function * @param {function(Writable)} handler Server stream call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeServerStreamHandler(handler, serialize, deserialize) { +function makeServerStreamHandler(handler) { /** * Handles a stream by attaching it to a serializing stream, and passing it to * the handler. @@ -240,10 +199,8 @@ function makeServerStreamHandler(handler, serialize, deserialize) { */ return function handleServerStreamCall(stream) { stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream, - serialize, - {}); - object_stream.request = deserialize(value); + var object_stream = new ServerWritableObjectStream(stream); + object_stream.request = value; handler(object_stream); }); }; @@ -252,23 +209,10 @@ function makeServerStreamHandler(handler, serialize, deserialize) { /** * Creates a binary stream handler function from a bidi stream handler function * @param {function(Duplex)} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeBidiStreamHandler(handler, serialize, deserialize) { - /** - * Handles a stream by wrapping it in a serializing and deserializing object - * stream, and passing it to the handler. - * @param {stream} stream Binary data stream - */ - return function handleBidiStreamCall(stream) { - var object_stream = new ServerBidiObjectStream(stream, - serialize, - deserialize, - {}); - handler(object_stream); - }; +function makeBidiStreamHandler(handler) { + return handler; } /** @@ -341,10 +285,13 @@ function makeServerConstructor(services) { common.fullyQualifiedName(method) + ' not provided.'); } var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)], - common.serializeCls(method.resolvedResponseType.build()), - common.deserializeCls(method.resolvedRequestType.build())); - server.register(prefix + capitalize(method.name), binary_handler); + service_handlers[service_name][decapitalize(method.name)]); + var serialize = common.serializeCls( + method.resolvedResponseType.build()); + var deserialize = common.deserializeCls( + method.resolvedRequestType.build()); + server.register(prefix + capitalize(method.name), binary_handler, + serialize, deserialize); }); }, this); } diff --git a/test/call_test.js b/test/call_test.js index 6e52ec89..b37c44ab 100644 --- a/test/call_test.js +++ b/test/call_test.js @@ -34,8 +34,6 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); -var channel = new grpc.Channel('localhost:7070'); - /** * Helper function to return an absolute deadline given a relative timeout in * seconds. @@ -49,6 +47,17 @@ function getDeadline(timeout_secs) { } describe('call', function() { + var channel; + var server; + before(function() { + server = new grpc.Server(); + var port = server.addHttp2Port('localhost:0'); + server.start(); + channel = new grpc.Channel('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); describe('constructor', function() { it('should reject anything less than 3 arguments', function() { assert.throws(function() { diff --git a/test/client_server_test.js b/test/client_server_test.js index 2a259086..d657ef41 100644 --- a/test/client_server_test.js +++ b/test/client_server_test.js @@ -35,9 +35,9 @@ var assert = require('assert'); var fs = require('fs'); var path = require('path'); var grpc = require('bindings')('grpc.node'); -var Server = require('../server'); -var client = require('../client'); -var common = require('../common'); +var Server = require('../src/server'); +var client = require('../src/client'); +var common = require('../src/common'); var _ = require('highland'); var ca_path = path.join(__dirname, 'data/ca.pem'); @@ -77,15 +77,32 @@ function errorHandler(stream) { }; } +/** + * Wait for a cancellation instead of responding + * @param {Stream} stream + */ +function cancelHandler(stream) { + // do nothing +} + describe('echo client', function() { - it('should receive echo responses', function(done) { - var server = new Server(); + var server; + var channel; + before(function() { + server = new Server(); var port_num = server.bind('0.0.0.0:0'); server.register('echo', echoHandler); + server.register('error', errorHandler); + server.register('cancellation', cancelHandler); server.start(); + channel = new grpc.Channel('localhost:' + port_num); + }); + after(function() { + server.shutdown(); + }); + it('should receive echo responses', function(done) { var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var channel = new grpc.Channel('localhost:' + port_num); var stream = client.makeRequest( channel, 'echo'); @@ -97,78 +114,91 @@ describe('echo client', function() { assert.equal(messages[index], chunk.toString()); index += 1; }); + stream.on('end', function() { + done(); + }); + }); + it('should get an error status that the server throws', function(done) { + var stream = client.makeRequest( + channel, + 'error', + null, + getDeadline(1)); + + stream.on('data', function() {}); + stream.write(new Buffer('test')); + stream.end(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.UNIMPLEMENTED); + assert.equal(status.details, 'error details'); + done(); + }); + }); + it('should be able to cancel a call', function(done) { + var stream = client.makeRequest( + channel, + 'cancellation', + null, + getDeadline(1)); + + stream.cancel(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.CANCELLED); + done(); + }); + }); +}); +/* TODO(mlumish): explore options for reducing duplication between this test + * and the insecure echo client test */ +describe('secure echo client', function() { + var server; + var channel; + before(function(done) { + fs.readFile(ca_path, function(err, ca_data) { + assert.ifError(err); + fs.readFile(key_path, function(err, key_data) { + assert.ifError(err); + fs.readFile(pem_path, function(err, pem_data) { + assert.ifError(err); + var creds = grpc.Credentials.createSsl(ca_data); + var server_creds = grpc.ServerCredentials.createSsl(null, + key_data, + pem_data); + + server = new Server({'credentials' : server_creds}); + var port_num = server.bind('0.0.0.0:0', true); + server.register('echo', echoHandler); + server.start(); + + channel = new grpc.Channel('localhost:' + port_num, { + 'grpc.ssl_target_name_override' : 'foo.test.google.com', + 'credentials' : creds + }); + done(); + }); + }); + }); + }); + after(function() { + server.shutdown(); + }); + it('should recieve echo responses', function(done) { + var messages = ['echo1', 'echo2', 'echo3', 'echo4']; + var stream = client.makeRequest( + channel, + 'echo'); + + _(messages).map(function(val) { + return new Buffer(val); + }).pipe(stream); + var index = 0; + stream.on('data', function(chunk) { + assert.equal(messages[index], chunk.toString()); + index += 1; + }); stream.on('end', function() { server.shutdown(); done(); }); }); - it('should get an error status that the server throws', function(done) { - var server = new Server(); - var port_num = server.bind('0.0.0.0:0'); - server.register('error', errorHandler); - server.start(); - - var channel = new grpc.Channel('localhost:' + port_num); - var stream = client.makeRequest( - channel, - 'error', - null, - getDeadline(1)); - - stream.on('data', function() {}); - stream.write(new Buffer('test')); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - assert.equal(status.details, 'error details'); - server.shutdown(); - done(); - }); - }); -}); -/* TODO(mlumish): explore options for reducing duplication between this test - * and the insecure echo client test */ -describe('secure echo client', function() { - it('should recieve echo responses', function(done) { - fs.readFile(ca_path, function(err, ca_data) { - assert.ifError(err); - fs.readFile(key_path, function(err, key_data) { - assert.ifError(err); - fs.readFile(pem_path, function(err, pem_data) { - assert.ifError(err); - var creds = grpc.Credentials.createSsl(ca_data); - var server_creds = grpc.ServerCredentials.createSsl(null, - key_data, - pem_data); - - var server = new Server({'credentials' : server_creds}); - var port_num = server.bind('0.0.0.0:0', true); - server.register('echo', echoHandler); - server.start(); - - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var channel = new grpc.Channel('localhost:' + port_num, { - 'grpc.ssl_target_name_override' : 'foo.test.google.com', - 'credentials' : creds - }); - var stream = client.makeRequest( - channel, - 'echo'); - - _(messages).map(function(val) { - return new Buffer(val); - }).pipe(stream); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk.toString()); - index += 1; - }); - stream.on('end', function() { - server.shutdown(); - done(); - }); - }); - }); - }); - }); }); diff --git a/test/end_to_end_test.js b/test/end_to_end_test.js index f7ccbcf5..f8cb660d 100644 --- a/test/end_to_end_test.js +++ b/test/end_to_end_test.js @@ -56,14 +56,21 @@ function multiDone(done, count) { } describe('end-to-end', function() { + var server; + var channel; + before(function() { + server = new grpc.Server(); + var port_num = server.addHttp2Port('0.0.0.0:0'); + server.start(); + channel = new grpc.Channel('localhost:' + port_num); + }); + after(function() { + server.shutdown(); + }); it('should start and end a request without error', function(complete) { - var server = new grpc.Server(); var done = multiDone(function() { complete(); - server.shutdown(); }, 2); - var port_num = server.addHttp2Port('0.0.0.0:0'); - var channel = new grpc.Channel('localhost:' + port_num); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'xyz'; @@ -81,7 +88,6 @@ describe('end-to-end', function() { done(); }, 0); - server.start(); server.requestCall(function(event) { assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); var server_call = event.call; @@ -109,13 +115,10 @@ describe('end-to-end', function() { it('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; - var server = new grpc.Server(); var done = multiDone(function() { complete(); server.shutdown(); }, 6); - var port_num = server.addHttp2Port('0.0.0.0:0'); - var channel = new grpc.Channel('localhost:' + port_num); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'success'; @@ -151,8 +154,6 @@ describe('end-to-end', function() { assert.strictEqual(event.data.toString(), reply_text); done(); }); - - server.start(); server.requestCall(function(event) { assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); var server_call = event.call; diff --git a/test/interop_sanity_test.js b/test/interop_sanity_test.js index 3c062b97..6cc7d444 100644 --- a/test/interop_sanity_test.js +++ b/test/interop_sanity_test.js @@ -48,6 +48,9 @@ describe('Interop tests', function() { port = 'localhost:' + server_obj.port; done(); }); + after(function() { + server.shutdown(); + }); // This depends on not using a binary stream it('should pass empty_unary', function(done) { interop_client.runTest(port, name_override, 'empty_unary', true, done); @@ -65,7 +68,7 @@ describe('Interop tests', function() { it('should pass ping_pong', function(done) { interop_client.runTest(port, name_override, 'ping_pong', true, done); }); - it.skip('should pass empty_stream', function(done) { + it('should pass empty_stream', function(done) { interop_client.runTest(port, name_override, 'empty_stream', true, done); }); }); diff --git a/test/server_test.js b/test/server_test.js index 457d13d2..5fad9a55 100644 --- a/test/server_test.js +++ b/test/server_test.js @@ -33,7 +33,7 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); -var Server = require('../server'); +var Server = require('../src/server'); /** * This is used for testing functions with multiple asynchronous calls that @@ -65,17 +65,22 @@ function echoHandler(stream) { } describe('echo server', function() { - it('should echo inputs as responses', function(done) { - done = multiDone(done, 4); - var server = new Server(); + var server; + var channel; + before(function() { + server = new Server(); var port_num = server.bind('[::]:0'); server.register('echo', echoHandler); server.start(); + channel = new grpc.Channel('localhost:' + port_num); + }); + it('should echo inputs as responses', function(done) { + done = multiDone(done, 4); + var req_text = 'echo test string'; var status_text = 'OK'; - var channel = new grpc.Channel('localhost:' + port_num); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var call = new grpc.Call(channel, diff --git a/test/surface_test.js b/test/surface_test.js index 34f1a156..85f4841d 100644 --- a/test/surface_test.js +++ b/test/surface_test.js @@ -33,7 +33,9 @@ var assert = require('assert'); -var surface_server = require('../surface_server.js'); +var surface_server = require('../src/surface_server.js'); + +var surface_client = require('../src/surface_client.js'); var ProtoBuf = require('protobufjs'); @@ -73,3 +75,54 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); +describe('Surface client', function() { + var client; + var server; + before(function() { + var Server = grpc.buildServer([mathService]); + server = new Server({ + 'math.Math': { + 'div': function(stream) {}, + 'divMany': function(stream) {}, + 'fib': function(stream) {}, + 'sum': function(stream) {} + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeClientConstructor(mathService); + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('Should correctly cancel a unary call', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a client stream call', function(done) { + var call = client.sum(function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a server stream call', function(done) { + var call = client.fib({'limit': 5}); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a bidi stream call', function(done) { + var call = client.divMany(); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); +});