sync up with master

This commit is contained in:
Yang Gao 2015-01-27 11:39:14 -08:00
commit adb24862ab
37 changed files with 333 additions and 297 deletions

View File

@ -28,17 +28,17 @@
},
"target_name": "grpc",
"sources": [
"byte_buffer.cc",
"call.cc",
"channel.cc",
"completion_queue_async_worker.cc",
"credentials.cc",
"event.cc",
"node_grpc.cc",
"server.cc",
"server_credentials.cc",
"tag.cc",
"timeval.cc"
"ext/byte_buffer.cc",
"ext/call.cc",
"ext/channel.cc",
"ext/completion_queue_async_worker.cc",
"ext/credentials.cc",
"ext/event.cc",
"ext/node_grpc.cc",
"ext/server.cc",
"ext/server_credentials.cc",
"ext/tag.cc",
"ext/timeval.cc"
],
'conditions' : [
['no_install=="yes"', {

View File

@ -52,7 +52,8 @@ var Server = grpc.buildServer([math.Math.service]);
*/
function mathDiv(call, cb) {
var req = call.request;
if (req.divisor == 0) {
// Unary + is explicit coersion to integer
if (+req.divisor === 0) {
cb(new Error('cannot divide by zero'));
}
cb(null, {
@ -89,7 +90,7 @@ function mathSum(call, cb) {
// Here, call is a standard readable Node object Stream
var sum = 0;
call.on('data', function(data) {
sum += data.num | 0;
sum += (+data.num);
});
call.on('end', function() {
cb(null, {num: sum});
@ -104,7 +105,7 @@ function mathDivMany(stream) {
Transform.call(this, options);
}
DivTransform.prototype._transform = function(div_args, encoding, callback) {
if (div_args.divisor == 0) {
if (+div_args.divisor === 0) {
callback(new Error('cannot divide by zero'));
}
callback(null, {

View File

View File

View File

View File

@ -35,9 +35,9 @@ var _ = require('underscore');
var ProtoBuf = require('protobufjs');
var surface_client = require('./surface_client.js');
var surface_client = require('./src/surface_client.js');
var surface_server = require('./surface_server.js');
var surface_server = require('./src/surface_server.js');
var grpc = require('bindings')('grpc');

View File

@ -183,7 +183,7 @@ function pingPong(client, done) {
assert.equal(response.payload.body.limit - response.payload.body.offset,
response_sizes[index]);
index += 1;
if (index == 4) {
if (index === 4) {
call.end();
} else {
call.write({

View File

@ -17,5 +17,5 @@
"mocha": "~1.21.0",
"minimist": "^1.1.0"
},
"main": "main.js"
"main": "index.js"
}

View File

@ -105,7 +105,7 @@ function GrpcClientStream(call, serialize, deserialize) {
return;
}
var data = event.data;
if (self.push(data) && data != null) {
if (self.push(self.deserialize(data)) && data != null) {
self._call.startRead(readCallback);
} else {
reading = false;
@ -155,11 +155,19 @@ GrpcClientStream.prototype._read = function(size) {
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
var self = this;
self._call.startWrite(chunk, function(event) {
self._call.startWrite(self.serialize(chunk), function(event) {
callback();
}, 0);
};
/**
* Cancel the ongoing call. If the call has not already finished, it will finish
* with status CANCELLED.
*/
GrpcClientStream.prototype.cancel = function() {
this._call.cancel();
};
/**
* Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request
@ -185,7 +193,7 @@ function makeRequest(channel,
if (metadata) {
call.addMetadata(metadata);
}
return new GrpcClientStream(call);
return new GrpcClientStream(call, serialize, deserialize);
}
/**

View File

@ -151,7 +151,7 @@ function GrpcServerStream(call, serialize, deserialize) {
return;
}
var data = event.data;
if (self.push(deserialize(data)) && data != null) {
if (self.push(self.deserialize(data)) && data != null) {
self._call.startRead(readCallback);
} else {
reading = false;
@ -233,7 +233,7 @@ function Server(options) {
function handleNewCall(event) {
var call = event.call;
var data = event.data;
if (data == null) {
if (data === null) {
return;
}
server.requestCall(handleNewCall);
@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
stream.emit('cancelled');
}
}, 0);
call.serverEndInitialMetadata(0);

View File

@ -63,80 +63,25 @@ util.inherits(ClientReadableObjectStream, Readable);
* client side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ClientReadableObjectStream(stream, deserialize, options) {
options = _.extend(options, {objectMode: true});
function ClientReadableObjectStream(stream) {
var options = {objectMode: true};
Readable.call(this, options);
this._stream = stream;
var self = this;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
if (!self.push(chunk)) {
self._stream.pause();
}
});
this._stream.pause();
}
util.inherits(ClientWritableObjectStream, Writable);
/**
* Class for representing a gRPC client streaming call as a Node stream on the
* client side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {object} options Stream options
*/
function ClientWritableObjectStream(stream, serialize, options) {
options = _.extend(options, {objectMode: true});
Writable.call(this, options);
this._stream = stream;
this._serialize = serialize;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this.on('finish', function() {
this._stream.end();
});
}
util.inherits(ClientBidiObjectStream, Duplex);
/**
* Class for representing a gRPC bidi streaming call as a Node stream on the
* client side. Extends from stream.Duplex.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ClientBidiObjectStream(stream, serialize, deserialize, options) {
options = _.extend(options, {objectMode: true});
Duplex.call(this, options);
this._stream = stream;
this._serialize = serialize;
var self = this;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.pause();
this.on('finish', function() {
this._stream.end();
});
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ClientReadableObjectStream|ClientBidiObjectStream}
* @this {ClientReadableObjectStream}
* @param {number} size Ignored
*/
function _read(size) {
@ -147,30 +92,51 @@ function _read(size) {
* See docs for _read
*/
ClientReadableObjectStream.prototype._read = _read;
util.inherits(ClientWritableObjectStream, Writable);
/**
* See docs for _read
* Class for representing a gRPC client streaming call as a Node stream on the
* client side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
ClientBidiObjectStream.prototype._read = _read;
function ClientWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this.on('finish', function() {
this._stream.end();
});
}
/**
* _write implementation for both types of streams that allow writing
* @this {ClientWritableObjectStream|ClientBidiObjectStream}
* @this {ClientWritableObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(this._serialize(chunk), encoding, callback);
this._stream.write(chunk, encoding, callback);
}
/**
* See docs for _write
*/
ClientWritableObjectStream.prototype._write = _write;
/**
* See docs for _write
* Cancel the underlying call
*/
ClientBidiObjectStream.prototype._write = _write;
function cancel() {
this._stream.cancel();
}
ClientReadableObjectStream.prototype.cancel = cancel;
ClientWritableObjectStream.prototype.cancel = cancel;
/**
* Get a function that can make unary requests to the specified method.
@ -196,19 +162,28 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeUnaryRequest(argument, callback, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, metadata, deadline);
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var emitter = new EventEmitter();
emitter.cancel = function cancel() {
stream.cancel();
};
forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata');
stream.write(serialize(argument));
stream.write(argument);
stream.end();
stream.on('data', function forwardData(chunk) {
try {
callback(null, deserialize(chunk));
callback(null, chunk);
} catch (e) {
callback(e);
}
});
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return emitter;
}
return makeUnaryRequest;
@ -236,15 +211,21 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeClientStreamRequest(callback, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, metadata, deadline);
var obj_stream = new ClientWritableObjectStream(stream, serialize, {});
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var obj_stream = new ClientWritableObjectStream(stream);
stream.on('data', function forwardData(chunk) {
try {
callback(null, deserialize(chunk));
callback(null, chunk);
} catch (e) {
callback(e);
}
});
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return obj_stream;
}
return makeClientStreamRequest;
@ -272,9 +253,10 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeServerStreamRequest(argument, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, metadata, deadline);
var obj_stream = new ClientReadableObjectStream(stream, deserialize, {});
stream.write(serialize(argument));
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var obj_stream = new ClientReadableObjectStream(stream);
stream.write(argument);
stream.end();
return obj_stream;
}
@ -301,12 +283,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeBidiStreamRequest(metadata, deadline) {
var stream = client.makeRequest(this.channel, method, metadata, deadline);
var obj_stream = new ClientBidiObjectStream(stream,
serialize,
deserialize,
{});
return obj_stream;
return client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
}
return makeBidiStreamRequest;
}

View File

@ -54,19 +54,20 @@ util.inherits(ServerReadableObjectStream, Readable);
* server side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ServerReadableObjectStream(stream, deserialize, options) {
options = _.extend(options, {objectMode: true});
function ServerReadableObjectStream(stream) {
var options = {objectMode: true};
Readable.call(this, options);
this._stream = stream;
Object.defineProperty(this, 'cancelled', {
get: function() { return stream.cancelled; }
});
var self = this;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
if (!self.push(chunk)) {
self._stream.pause();
}
});
@ -76,57 +77,6 @@ function ServerReadableObjectStream(stream, deserialize, options) {
this._stream.pause();
}
util.inherits(ServerWritableObjectStream, Writable);
/**
* Class for representing a gRPC server streaming call as a Node stream on the
* server side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {object} options Stream options
*/
function ServerWritableObjectStream(stream, serialize, options) {
options = _.extend(options, {objectMode: true});
Writable.call(this, options);
this._stream = stream;
this._serialize = serialize;
this.on('finish', function() {
this._stream.end();
});
}
util.inherits(ServerBidiObjectStream, Duplex);
/**
* Class for representing a gRPC bidi streaming call as a Node stream on the
* server side. Extends from stream.Duplex.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ServerBidiObjectStream(stream, serialize, deserialize, options) {
options = _.extend(options, {objectMode: true});
Duplex.call(this, options);
this._stream = stream;
this._serialize = serialize;
var self = this;
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.on('end', function forwardEnd() {
self.push(null);
});
this._stream.pause();
this.on('finish', function() {
this._stream.end();
});
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ServerReadableObjectStream|ServerBidiObjectStream}
@ -140,39 +90,49 @@ function _read(size) {
* See docs for _read
*/
ServerReadableObjectStream.prototype._read = _read;
util.inherits(ServerWritableObjectStream, Writable);
/**
* See docs for _read
* Class for representing a gRPC server streaming call as a Node stream on the
* server side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
ServerBidiObjectStream.prototype._read = _read;
function ServerWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this.on('finish', function() {
this._stream.end();
});
}
/**
* _write implementation for both types of streams that allow writing
* @this {ServerWritableObjectStream|ServerBidiObjectStream}
* @this {ServerWritableObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(this._serialize(chunk), encoding, callback);
this._stream.write(chunk, encoding, callback);
}
/**
* See docs for _write
*/
ServerWritableObjectStream.prototype._write = _write;
/**
* See docs for _write
*/
ServerBidiObjectStream.prototype._write = _write;
/**
* Creates a binary stream handler function from a unary handler function
* @param {function(Object, function(Error, *))} handler Unary call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeUnaryHandler(handler, serialize, deserialize) {
function makeUnaryHandler(handler) {
/**
* Handles a stream by reading a single data value, passing it to the handler,
* and writing the response back to the stream.
@ -180,15 +140,18 @@ function makeUnaryHandler(handler, serialize, deserialize) {
*/
return function handleUnaryCall(stream) {
stream.on('data', function handleUnaryData(value) {
var call = {request: deserialize(value)};
var call = {request: value};
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
stream.on('cancelled', function() {
call.emit('cancelled');
});
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(serialize(value));
stream.write(value);
stream.end();
}
});
@ -201,23 +164,21 @@ function makeUnaryHandler(handler, serialize, deserialize) {
* function
* @param {function(Readable, function(Error, *))} handler Client stream call
* handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeClientStreamHandler(handler, serialize, deserialize) {
function makeClientStreamHandler(handler) {
/**
* Handles a stream by passing a deserializing stream to the handler and
* writing the response back to the stream.
* @param {stream} stream Binary data stream
*/
return function handleClientStreamCall(stream) {
var object_stream = new ServerReadableObjectStream(stream, deserialize, {});
var object_stream = new ServerReadableObjectStream(stream);
handler(object_stream, function sendClientStreamData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(serialize(value));
stream.write(value);
stream.end();
}
});
@ -228,11 +189,9 @@ function makeClientStreamHandler(handler, serialize, deserialize) {
* Creates a binary stream handler function from a server stream handler
* function
* @param {function(Writable)} handler Server stream call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeServerStreamHandler(handler, serialize, deserialize) {
function makeServerStreamHandler(handler) {
/**
* Handles a stream by attaching it to a serializing stream, and passing it to
* the handler.
@ -240,10 +199,8 @@ function makeServerStreamHandler(handler, serialize, deserialize) {
*/
return function handleServerStreamCall(stream) {
stream.on('data', function handleClientData(value) {
var object_stream = new ServerWritableObjectStream(stream,
serialize,
{});
object_stream.request = deserialize(value);
var object_stream = new ServerWritableObjectStream(stream);
object_stream.request = value;
handler(object_stream);
});
};
@ -252,23 +209,10 @@ function makeServerStreamHandler(handler, serialize, deserialize) {
/**
* Creates a binary stream handler function from a bidi stream handler function
* @param {function(Duplex)} handler Unary call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeBidiStreamHandler(handler, serialize, deserialize) {
/**
* Handles a stream by wrapping it in a serializing and deserializing object
* stream, and passing it to the handler.
* @param {stream} stream Binary data stream
*/
return function handleBidiStreamCall(stream) {
var object_stream = new ServerBidiObjectStream(stream,
serialize,
deserialize,
{});
handler(object_stream);
};
function makeBidiStreamHandler(handler) {
return handler;
}
/**
@ -341,10 +285,13 @@ function makeServerConstructor(services) {
common.fullyQualifiedName(method) + ' not provided.');
}
var binary_handler = handler_makers[method_type](
service_handlers[service_name][decapitalize(method.name)],
common.serializeCls(method.resolvedResponseType.build()),
common.deserializeCls(method.resolvedRequestType.build()));
server.register(prefix + capitalize(method.name), binary_handler);
service_handlers[service_name][decapitalize(method.name)]);
var serialize = common.serializeCls(
method.resolvedResponseType.build());
var deserialize = common.deserializeCls(
method.resolvedRequestType.build());
server.register(prefix + capitalize(method.name), binary_handler,
serialize, deserialize);
});
}, this);
}

View File

@ -34,8 +34,6 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var channel = new grpc.Channel('localhost:7070');
/**
* Helper function to return an absolute deadline given a relative timeout in
* seconds.
@ -49,6 +47,17 @@ function getDeadline(timeout_secs) {
}
describe('call', function() {
var channel;
var server;
before(function() {
server = new grpc.Server();
var port = server.addHttp2Port('localhost:0');
server.start();
channel = new grpc.Channel('localhost:' + port);
});
after(function() {
server.shutdown();
});
describe('constructor', function() {
it('should reject anything less than 3 arguments', function() {
assert.throws(function() {

View File

@ -35,9 +35,9 @@ var assert = require('assert');
var fs = require('fs');
var path = require('path');
var grpc = require('bindings')('grpc.node');
var Server = require('../server');
var client = require('../client');
var common = require('../common');
var Server = require('../src/server');
var client = require('../src/client');
var common = require('../src/common');
var _ = require('highland');
var ca_path = path.join(__dirname, 'data/ca.pem');
@ -77,15 +77,32 @@ function errorHandler(stream) {
};
}
/**
* Wait for a cancellation instead of responding
* @param {Stream} stream
*/
function cancelHandler(stream) {
// do nothing
}
describe('echo client', function() {
it('should receive echo responses', function(done) {
var server = new Server();
var server;
var channel;
before(function() {
server = new Server();
var port_num = server.bind('0.0.0.0:0');
server.register('echo', echoHandler);
server.register('error', errorHandler);
server.register('cancellation', cancelHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num);
});
after(function() {
server.shutdown();
});
it('should receive echo responses', function(done) {
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var channel = new grpc.Channel('localhost:' + port_num);
var stream = client.makeRequest(
channel,
'echo');
@ -97,78 +114,91 @@ describe('echo client', function() {
assert.equal(messages[index], chunk.toString());
index += 1;
});
stream.on('end', function() {
done();
});
});
it('should get an error status that the server throws', function(done) {
var stream = client.makeRequest(
channel,
'error',
null,
getDeadline(1));
stream.on('data', function() {});
stream.write(new Buffer('test'));
stream.end();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.UNIMPLEMENTED);
assert.equal(status.details, 'error details');
done();
});
});
it('should be able to cancel a call', function(done) {
var stream = client.makeRequest(
channel,
'cancellation',
null,
getDeadline(1));
stream.cancel();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.CANCELLED);
done();
});
});
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
describe('secure echo client', function() {
var server;
var channel;
before(function(done) {
fs.readFile(ca_path, function(err, ca_data) {
assert.ifError(err);
fs.readFile(key_path, function(err, key_data) {
assert.ifError(err);
fs.readFile(pem_path, function(err, pem_data) {
assert.ifError(err);
var creds = grpc.Credentials.createSsl(ca_data);
var server_creds = grpc.ServerCredentials.createSsl(null,
key_data,
pem_data);
server = new Server({'credentials' : server_creds});
var port_num = server.bind('0.0.0.0:0', true);
server.register('echo', echoHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num, {
'grpc.ssl_target_name_override' : 'foo.test.google.com',
'credentials' : creds
});
done();
});
});
});
});
after(function() {
server.shutdown();
});
it('should recieve echo responses', function(done) {
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var stream = client.makeRequest(
channel,
'echo');
_(messages).map(function(val) {
return new Buffer(val);
}).pipe(stream);
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk.toString());
index += 1;
});
stream.on('end', function() {
server.shutdown();
done();
});
});
it('should get an error status that the server throws', function(done) {
var server = new Server();
var port_num = server.bind('0.0.0.0:0');
server.register('error', errorHandler);
server.start();
var channel = new grpc.Channel('localhost:' + port_num);
var stream = client.makeRequest(
channel,
'error',
null,
getDeadline(1));
stream.on('data', function() {});
stream.write(new Buffer('test'));
stream.end();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.UNIMPLEMENTED);
assert.equal(status.details, 'error details');
server.shutdown();
done();
});
});
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
describe('secure echo client', function() {
it('should recieve echo responses', function(done) {
fs.readFile(ca_path, function(err, ca_data) {
assert.ifError(err);
fs.readFile(key_path, function(err, key_data) {
assert.ifError(err);
fs.readFile(pem_path, function(err, pem_data) {
assert.ifError(err);
var creds = grpc.Credentials.createSsl(ca_data);
var server_creds = grpc.ServerCredentials.createSsl(null,
key_data,
pem_data);
var server = new Server({'credentials' : server_creds});
var port_num = server.bind('0.0.0.0:0', true);
server.register('echo', echoHandler);
server.start();
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var channel = new grpc.Channel('localhost:' + port_num, {
'grpc.ssl_target_name_override' : 'foo.test.google.com',
'credentials' : creds
});
var stream = client.makeRequest(
channel,
'echo');
_(messages).map(function(val) {
return new Buffer(val);
}).pipe(stream);
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk.toString());
index += 1;
});
stream.on('end', function() {
server.shutdown();
done();
});
});
});
});
});
});

View File

@ -56,14 +56,21 @@ function multiDone(done, count) {
}
describe('end-to-end', function() {
var server;
var channel;
before(function() {
server = new grpc.Server();
var port_num = server.addHttp2Port('0.0.0.0:0');
server.start();
channel = new grpc.Channel('localhost:' + port_num);
});
after(function() {
server.shutdown();
});
it('should start and end a request without error', function(complete) {
var server = new grpc.Server();
var done = multiDone(function() {
complete();
server.shutdown();
}, 2);
var port_num = server.addHttp2Port('0.0.0.0:0');
var channel = new grpc.Channel('localhost:' + port_num);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'xyz';
@ -81,7 +88,6 @@ describe('end-to-end', function() {
done();
}, 0);
server.start();
server.requestCall(function(event) {
assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
var server_call = event.call;
@ -109,13 +115,10 @@ describe('end-to-end', function() {
it('should send and receive data without error', function(complete) {
var req_text = 'client_request';
var reply_text = 'server_response';
var server = new grpc.Server();
var done = multiDone(function() {
complete();
server.shutdown();
}, 6);
var port_num = server.addHttp2Port('0.0.0.0:0');
var channel = new grpc.Channel('localhost:' + port_num);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'success';
@ -151,8 +154,6 @@ describe('end-to-end', function() {
assert.strictEqual(event.data.toString(), reply_text);
done();
});
server.start();
server.requestCall(function(event) {
assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
var server_call = event.call;

View File

@ -48,6 +48,9 @@ describe('Interop tests', function() {
port = 'localhost:' + server_obj.port;
done();
});
after(function() {
server.shutdown();
});
// This depends on not using a binary stream
it('should pass empty_unary', function(done) {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
@ -65,7 +68,7 @@ describe('Interop tests', function() {
it('should pass ping_pong', function(done) {
interop_client.runTest(port, name_override, 'ping_pong', true, done);
});
it.skip('should pass empty_stream', function(done) {
it('should pass empty_stream', function(done) {
interop_client.runTest(port, name_override, 'empty_stream', true, done);
});
});

View File

@ -33,7 +33,7 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var Server = require('../server');
var Server = require('../src/server');
/**
* This is used for testing functions with multiple asynchronous calls that
@ -65,17 +65,22 @@ function echoHandler(stream) {
}
describe('echo server', function() {
it('should echo inputs as responses', function(done) {
done = multiDone(done, 4);
var server = new Server();
var server;
var channel;
before(function() {
server = new Server();
var port_num = server.bind('[::]:0');
server.register('echo', echoHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num);
});
it('should echo inputs as responses', function(done) {
done = multiDone(done, 4);
var req_text = 'echo test string';
var status_text = 'OK';
var channel = new grpc.Channel('localhost:' + port_num);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var call = new grpc.Call(channel,

View File

@ -33,7 +33,9 @@
var assert = require('assert');
var surface_server = require('../surface_server.js');
var surface_server = require('../src/surface_server.js');
var surface_client = require('../src/surface_client.js');
var ProtoBuf = require('protobufjs');
@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
describe('Surface client', function() {
var client;
var server;
before(function() {
var Server = grpc.buildServer([mathService]);
server = new Server({
'math.Math': {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeClientConstructor(mathService);
client = new Client('localhost:' + port);
});
after(function() {
server.shutdown();
});
it('Should correctly cancel a unary call', function(done) {
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a client stream call', function(done) {
var call = client.sum(function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5});
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany();
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
});