Merge pull request #2464 from murgatroid99/node_server_construction_change

Changed to newer, simpler server construction interface
This commit is contained in:
Abhishek Kumar 2015-07-20 13:56:11 -07:00
commit 2cc0578866
11 changed files with 319 additions and 296 deletions

View File

@ -54,10 +54,10 @@ function loadObject(reflectionObject)
Returns the same structure that `load` returns, but takes a reflection object from `ProtoBuf.js` instead of a file name.
```javascript
function buildServer(serviceArray)
function Server([serverOpions])
```
Takes an array of service objects and returns a constructor for a server that handles requests to all of those services.
Constructs a server to which service/implementation pairs can be added.
```javascript

View File

@ -36,8 +36,6 @@
var grpc = require('..');
var math = grpc.load(__dirname + '/math.proto').math;
var Server = grpc.buildServer([math.Math.service]);
/**
* Server function for division. Provides the /Math/DivMany and /Math/Div
* functions (Div is just DivMany with only one stream element). For each
@ -108,19 +106,17 @@ function mathDivMany(stream) {
stream.end();
});
}
var server = new Server({
'math.Math' : {
div: mathDiv,
fib: mathFib,
sum: mathSum,
divMany: mathDivMany
}
var server = new grpc.Server();
server.addProtoService(math.Math.service, {
div: mathDiv,
fib: mathFib,
sum: mathSum,
divMany: mathDivMany
});
if (require.main === module) {
server.bind('0.0.0.0:50051');
server.listen();
server.start();
}
/**

View File

@ -40,8 +40,6 @@ var _ = require('lodash');
var grpc = require('..');
var examples = grpc.load(__dirname + '/route_guide.proto').examples;
var Server = grpc.buildServer([examples.RouteGuide.service]);
var COORD_FACTOR = 1e7;
/**
@ -228,14 +226,14 @@ function routeChat(call) {
* @return {Server} The new server object
*/
function getServer() {
return new Server({
'examples.RouteGuide' : {
getFeature: getFeature,
listFeatures: listFeatures,
recordRoute: recordRoute,
routeChat: routeChat
}
var server = new grpc.Server();
server.addProtoService(examples.RouteGuide.service, {
getFeature: getFeature,
listFeatures: listFeatures,
recordRoute: recordRoute,
routeChat: routeChat
});
return server;
}
if (require.main === module) {

View File

@ -37,8 +37,6 @@ var _ = require('lodash');
var grpc = require('..');
var examples = grpc.load(__dirname + '/stock.proto').examples;
var StockServer = grpc.buildServer([examples.Stock.service]);
function getLastTradePrice(call, callback) {
callback(null, {symbol: call.request.symbol, price: 88});
}
@ -73,13 +71,12 @@ function getLastTradePriceMultiple(call) {
});
}
var stockServer = new StockServer({
'examples.Stock' : {
getLastTradePrice: getLastTradePrice,
getLastTradePriceMultiple: getLastTradePriceMultiple,
watchFutureTrades: watchFutureTrades,
getHighestTradePrice: getHighestTradePrice
}
var stockServer = new grpc.Server();
stockServer.addProtoService(examples.Stock.service, {
getLastTradePrice: getLastTradePrice,
getLastTradePriceMultiple: getLastTradePriceMultiple,
watchFutureTrades: watchFutureTrades,
getHighestTradePrice: getHighestTradePrice
});
if (require.main === module) {

View File

@ -133,9 +133,9 @@ exports.loadObject = loadObject;
exports.load = load;
/**
* See docs for server.makeServerConstructor
* See docs for Server
*/
exports.buildServer = server.makeProtobufServerConstructor;
exports.Server = server.Server;
/**
* Status name to code number mapping
@ -159,5 +159,3 @@ exports.ServerCredentials = grpc.ServerCredentials;
exports.getGoogleAuthDelegate = getGoogleAuthDelegate;
exports.makeGenericClientConstructor = client.makeClientConstructor;
exports.makeGenericServerConstructor = server.makeServerConstructor;

View File

@ -38,7 +38,6 @@ var path = require('path');
var _ = require('lodash');
var grpc = require('..');
var testProto = grpc.load(__dirname + '/test.proto').grpc.testing;
var Server = grpc.buildServer([testProto.TestService.service]);
/**
* Create a buffer filled with size zeroes
@ -173,16 +172,15 @@ function getServer(port, tls) {
key_data,
pem_data);
}
var server = new Server({
'grpc.testing.TestService' : {
emptyCall: handleEmpty,
unaryCall: handleUnary,
streamingOutputCall: handleStreamingOutput,
streamingInputCall: handleStreamingInput,
fullDuplexCall: handleFullDuplex,
halfDuplexCall: handleHalfDuplex
}
}, null, options);
var server = new grpc.Server(options);
server.addProtoService(testProto.TestService.service, {
emptyCall: handleEmpty,
unaryCall: handleUnary,
streamingOutputCall: handleStreamingOutput,
streamingInputCall: handleStreamingInput,
fullDuplexCall: handleFullDuplex,
halfDuplexCall: handleHalfDuplex
});
var port_num = server.bind('0.0.0.0:' + port, server_creds);
return {server: server, port: port_num};
}

View File

@ -72,6 +72,9 @@ function handleError(call, error) {
status.metadata = error.metadata;
}
var error_batch = {};
if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
}
@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) {
if (metadata) {
status.metadata = metadata;
}
if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) {
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
if (!stream.call.metadataSent) {
stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
}
@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) {
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, value) {
if (err) {
@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) {
ServerWritableStream.prototype._write = _write;
function sendMetadata(responseMetadata) {
/* jshint validthis: true */
if (!this.call.metadataSent) {
this.call.metadataSent = true;
var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
this.call.startBatch(batch, function(err) {
if (err) {
this.emit('error', err);
return;
}
});
}
}
ServerWritableStream.prototype.sendMetadata = sendMetadata;
util.inherits(ServerReadableStream, Readable);
/**
@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) {
ServerDuplexStream.prototype._read = _read;
ServerDuplexStream.prototype._write = _write;
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
/**
* Fully handle a unary call
@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write;
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
emitter.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
emitter.on('error', function(error) {
handleError(call, error);
});
emitter.metadata = metadata;
waitForCancel(call, emitter);
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) {
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
waitForCancel(call, stream);
stream.metadata = metadata;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
stream.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
stream.on('error', function(error) {
handleError(call, error);
});
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
handler.deserialize);
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream);
}
@ -466,29 +511,28 @@ var streamHandlers = {
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
* @param {function(string, Object<string, Array<Buffer>>):
Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
* metatada for a given method
* @param {Object=} options Options that should be passed to the internal server
* implementation
*/
function Server(getMetadata, options) {
function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
this.started = false;
/**
* Start the server and begin handling requests
* @this Server
*/
this.listen = function() {
this.start = function() {
if (this.started) {
throw new Error('Server is already running');
}
this.started = true;
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
});
if (this.started) {
throw 'Server is already running';
}
server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
@ -523,11 +567,7 @@ function Server(getMetadata, options) {
call.startBatch(batch, function() {});
return;
}
var response_metadata = {};
if (getMetadata) {
response_metadata = getMetadata(method, metadata);
}
streamHandlers[handler.type](call, handler, response_metadata);
streamHandlers[handler.type](call, handler, metadata);
}
server.requestCall(handleNewCall);
};
@ -565,6 +605,47 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
return true;
};
Server.prototype.addService = function(service, implementation) {
if (this.started) {
throw new Error('Can\'t add a service to a started server.');
}
var self = this;
_.each(service, function(attrs, name) {
var method_type;
if (attrs.requestStream) {
if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
if (implementation[name] === undefined) {
throw new Error('Method handler for ' + attrs.path +
' not provided.');
}
var serialize = attrs.responseSerialize;
var deserialize = attrs.requestDeserialize;
var register_success = self.register(attrs.path,
_.bind(implementation[name],
implementation),
serialize, deserialize, method_type);
if (!register_success) {
throw new Error('Method handler for ' + attrs.path +
' already provided.');
}
});
};
Server.prototype.addProtoService = function(service, implementation) {
this.addService(common.getProtobufServiceAttrs(service), implementation);
};
/**
* Binds the server to the given port, with SSL enabled if creds is given
* @param {string} port The port that the server should bind on, in the format
@ -573,6 +654,9 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
* nothing for an insecure port
*/
Server.prototype.bind = function(port, creds) {
if (this.started) {
throw new Error('Can\'t bind an already running server to an address');
}
if (creds) {
return this._server.addSecureHttp2Port(port, creds);
} else {
@ -581,131 +665,6 @@ Server.prototype.bind = function(port, creds) {
};
/**
* Create a constructor for servers with services defined by service_attr_map.
* That is an object that maps (namespaced) service names to objects that in
* turn map method names to objects with the following keys:
* path: The path on the server for accessing the method. For example, for
* protocol buffers, we use "/service_name/method_name"
* requestStream: bool indicating whether the client sends a stream
* resonseStream: bool indicating whether the server sends a stream
* requestDeserialize: function to deserialize request objects
* responseSerialize: function to serialize response objects
* @param {Object} service_attr_map An object mapping service names to method
* attribute map objects
* @return {function(Object, function, Object=)} New server constructor
* See documentation for Server
*/
function makeServerConstructor(service_attr_map) {
/**
* Create a server with the given handlers for all of the methods.
* @constructor
* @param {Object} service_handlers Map from service names to map from method
* names to handlers
* @param {function(string, Object<string, Array<Buffer>>):
Object<string, Array<Buffer|string>>=} getMetadata Callback that
* gets metatada for a given method
* @param {Object=} options Options to pass to the underlying server
*/
function SurfaceServer(service_handlers, getMetadata, options) {
var server = new Server(getMetadata, options);
this.inner_server = server;
_.each(service_attr_map, function(service_attrs, service_name) {
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
_.each(service_attrs, function(attrs, name) {
var method_type;
if (attrs.requestStream) {
if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
if (service_handlers[service_name][name] === undefined) {
throw new Error('Method handler for ' + attrs.path +
' not provided.');
}
var serialize = attrs.responseSerialize;
var deserialize = attrs.requestDeserialize;
server.register(attrs.path, _.bind(service_handlers[service_name][name],
service_handlers[service_name]),
serialize, deserialize, method_type);
});
}, this);
}
/**
* Binds the server to the given port, with SSL enabled if creds is supplied
* @param {string} port The port that the server should bind on, in the format
* "address:port"
* @param {boolean=} creds Credentials to use for SSL
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.bind = function(port, creds) {
return this.inner_server.bind(port, creds);
};
/**
* Starts the server listening on any bound ports
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.listen = function() {
this.inner_server.listen();
return this;
};
/**
* Shuts the server down; tells it to stop listening for new requests and to
* kill old requests.
*/
SurfaceServer.prototype.shutdown = function() {
this.inner_server.shutdown();
};
return SurfaceServer;
}
/**
* Create a constructor for servers that serve the given services.
* @param {Array<ProtoBuf.Reflect.Service>} services The services that the
* servers will serve
* @return {function(Object, function, Object=)} New server constructor
*/
function makeProtobufServerConstructor(services) {
var qual_names = [];
var service_attr_map = {};
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
var method_attrs = common.getProtobufServiceAttrs(service);
if (!service_attr_map.hasOwnProperty(service_name)) {
service_attr_map[service_name] = {};
}
service_attr_map[service_name] = _.extend(service_attr_map[service_name],
method_attrs);
});
return makeServerConstructor(service_attr_map);
}
/**
* See documentation for makeServerConstructor
*/
exports.makeServerConstructor = makeServerConstructor;
/**
* See documentation for makeProtobufServerConstructor
*/
exports.makeProtobufServerConstructor = makeProtobufServerConstructor;
exports.Server = Server;

View File

@ -49,14 +49,13 @@ describe('Health Checking', function() {
'grpc.test.TestService': 'SERVING'
}
};
var HealthServer = grpc.buildServer([health.service]);
var healthServer = new HealthServer({
'grpc.health.v1alpha.Health': new health.Implementation(statusMap)
});
var healthServer = new grpc.Server();
healthServer.addProtoService(health.service,
new health.Implementation(statusMap));
var healthClient;
before(function() {
var port_num = healthServer.bind('0.0.0.0:0');
healthServer.listen();
healthServer.start();
healthClient = new health.Client('localhost:' + port_num);
});
after(function() {

View File

@ -46,7 +46,7 @@ describe('Interop tests', function() {
before(function(done) {
var server_obj = interop_server.getServer(0, true);
server = server_obj.server;
server.listen();
server.start();
port = 'localhost:' + server_obj.port;
done();
});

View File

@ -52,7 +52,7 @@ var server = require('../examples/math_server.js');
describe('Math client', function() {
before(function(done) {
var port_num = server.bind('0.0.0.0:0');
server.listen();
server.start();
math_client = new math.Math('localhost:' + port_num);
done();
});

View File

@ -69,34 +69,45 @@ describe('File loader', function() {
});
});
});
describe('Surface server constructor', function() {
it('Should fail with conflicting method names', function() {
assert.throws(function() {
grpc.buildServer([mathService, mathService]);
});
describe('Server.prototype.addProtoService', function() {
var server;
var dummyImpls = {
'div': function() {},
'divMany': function() {},
'fib': function() {},
'sum': function() {}
};
beforeEach(function() {
server = new grpc.Server();
});
afterEach(function() {
server.shutdown();
});
it('Should succeed with a single service', function() {
assert.doesNotThrow(function() {
grpc.buildServer([mathService]);
server.addProtoService(mathService, dummyImpls);
});
});
it('Should fail with conflicting method names', function() {
server.addProtoService(mathService, dummyImpls);
assert.throws(function() {
server.addProtoService(mathService, dummyImpls);
});
});
it('Should fail with missing handlers', function() {
var Server = grpc.buildServer([mathService]);
assert.throws(function() {
new Server({
'math.Math': {
'div': function() {},
'divMany': function() {},
'fib': function() {}
}
server.addProtoService(mathService, {
'div': function() {},
'divMany': function() {},
'fib': function() {}
});
}, /math.Math.Sum/);
});
it('Should fail with no handlers for the service', function() {
var Server = grpc.buildServer([mathService]);
it('Should fail if the server has been started', function() {
server.start();
assert.throws(function() {
new Server({});
}, /math.Math/);
server.addProtoService(mathService, dummyImpls);
});
});
});
describe('Echo service', function() {
@ -105,18 +116,16 @@ describe('Echo service', function() {
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto');
var echo_service = test_proto.lookup('EchoService');
var Server = grpc.buildServer([echo_service]);
server = new Server({
'EchoService': {
echo: function(call, callback) {
callback(null, call.request);
}
server = new grpc.Server();
server.addProtoService(echo_service, {
echo: function(call, callback) {
callback(null, call.request);
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(echo_service);
client = new Client('localhost:' + port);
server.listen();
server.start();
});
after(function() {
server.shutdown();
@ -151,18 +160,14 @@ describe('Generic client and server', function() {
var client;
var server;
before(function() {
var Server = grpc.makeGenericServerConstructor({
string: string_service_attrs
});
server = new Server({
string: {
capitalize: function(call, callback) {
callback(null, _.capitalize(call.request));
}
server = new grpc.Server();
server.addService(string_service_attrs, {
capitalize: function(call, callback) {
callback(null, _.capitalize(call.request));
}
});
var port = server.bind('localhost:0');
server.listen();
server.start();
var Client = grpc.makeGenericClientConstructor(string_service_attrs);
client = new Client('localhost:' + port);
});
@ -178,6 +183,82 @@ describe('Generic client and server', function() {
});
});
});
describe('Echo metadata', function() {
var client;
var server;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
server = new grpc.Server();
server.addProtoService(test_service, {
unary: function(call, cb) {
call.sendMetadata(call.metadata);
cb(null, {});
},
clientStream: function(stream, cb){
stream.on('data', function(data) {});
stream.on('end', function() {
stream.sendMetadata(stream.metadata);
cb(null, {});
});
},
serverStream: function(stream) {
stream.sendMetadata(stream.metadata);
stream.end();
},
bidiStream: function(stream) {
stream.on('data', function(data) {});
stream.on('end', function() {
stream.sendMetadata(stream.metadata);
stream.end();
});
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port);
server.start();
});
after(function() {
server.shutdown();
});
it('with unary call', function(done) {
var call = client.unary({}, function(err, data) {
assert.ifError(err);
}, {key: ['value']});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
done();
});
});
it('with client stream call', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
}, {key: ['value']});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
done();
});
call.end();
});
it('with server stream call', function(done) {
var call = client.serverStream({}, {key: ['value']});
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
done();
});
});
it('with bidi stream call', function(done) {
var call = client.bidiStream({key: ['value']});
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
done();
});
call.end();
});
});
describe('Other conditions', function() {
var client;
var server;
@ -185,72 +266,70 @@ describe('Other conditions', function() {
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
var Server = grpc.buildServer([test_service]);
server = new Server({
TestService: {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
server = new grpc.Server();
server.addProtoService(test_service, {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
cb(new Error('Requested error'), null, {metadata: ['yes']});
} else {
cb(null, {count: 1}, {metadata: ['yes']});
}
},
clientStream: function(stream, cb){
var count = 0;
var errored;
stream.on('data', function(data) {
if (data.error) {
errored = true;
cb(new Error('Requested error'), null, {metadata: ['yes']});
} else {
cb(null, {count: 1}, {metadata: ['yes']});
count += 1;
}
},
clientStream: function(stream, cb){
var count = 0;
var errored;
stream.on('data', function(data) {
if (data.error) {
errored = true;
cb(new Error('Requested error'), null, {metadata: ['yes']});
} else {
count += 1;
}
});
stream.on('end', function() {
if (!errored) {
cb(null, {count: count}, {metadata: ['yes']});
}
});
},
serverStream: function(stream) {
var req = stream.request;
if (req.error) {
});
stream.on('end', function() {
if (!errored) {
cb(null, {count: count}, {metadata: ['yes']});
}
});
},
serverStream: function(stream) {
var req = stream.request;
if (req.error) {
var err = new Error('Requested error');
err.metadata = {metadata: ['yes']};
stream.emit('error', err);
} else {
for (var i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end({metadata: ['yes']});
}
},
bidiStream: function(stream) {
var count = 0;
stream.on('data', function(data) {
if (data.error) {
var err = new Error('Requested error');
err.metadata = {metadata: ['yes']};
err.metadata = {
metadata: ['yes'],
count: ['' + count]
};
stream.emit('error', err);
} else {
for (var i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end({metadata: ['yes']});
stream.write({count: count});
count += 1;
}
},
bidiStream: function(stream) {
var count = 0;
stream.on('data', function(data) {
if (data.error) {
var err = new Error('Requested error');
err.metadata = {
metadata: ['yes'],
count: ['' + count]
};
stream.emit('error', err);
} else {
stream.write({count: count});
count += 1;
}
});
stream.on('end', function() {
stream.end({metadata: ['yes']});
});
}
});
stream.on('end', function() {
stream.end({metadata: ['yes']});
});
}
});
port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port);
server.listen();
server.start();
});
after(function() {
server.shutdown();
@ -465,18 +544,17 @@ describe('Cancelling surface client', function() {
var client;
var server;
before(function() {
var Server = grpc.buildServer([mathService]);
server = new Server({
'math.Math': {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
}
server = new grpc.Server();
server.addProtoService(mathService, {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(mathService);
client = new Client('localhost:' + port);
server.start();
});
after(function() {
server.shutdown();