Removed server-wide metadata handler, individual handlers can now send metadata

This commit is contained in:
murgatroid99 2015-07-16 13:16:14 -07:00
parent afad0c0fca
commit 024295e6a5
2 changed files with 56 additions and 18 deletions

View File

@ -172,7 +172,7 @@ function getServer(port, tls) {
key_data,
pem_data);
}
var server = new grpc.Server(null, options);
var server = new grpc.Server(options);
server.addProtoService(testProto.TestService.service, {
emptyCall: handleEmpty,
unaryCall: handleUnary,

View File

@ -72,6 +72,9 @@ function handleError(call, error) {
status.metadata = error.metadata;
}
var error_batch = {};
if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
}
@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) {
if (metadata) {
status.metadata = metadata;
}
if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) {
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
if (!stream.call.metadataSent) {
stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
}
@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) {
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, value) {
if (err) {
@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) {
ServerWritableStream.prototype._write = _write;
function sendMetadata(responseMetadata) {
/* jshint validthis: true */
if (!this.call.metadataSent) {
this.call.metadataSent = true;
var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
this.call.startBatch(batch, function(err) {
if (err) {
this.emit('error', err);
return;
}
});
}
}
ServerWritableStream.prototype.sendMetadata = sendMetadata;
util.inherits(ServerReadableStream, Readable);
/**
@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) {
ServerDuplexStream.prototype._read = _read;
ServerDuplexStream.prototype._write = _write;
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
/**
* Fully handle a unary call
@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write;
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
emitter.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
emitter.on('error', function(error) {
handleError(call, error);
});
emitter.metadata = metadata;
waitForCancel(call, emitter);
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) {
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
waitForCancel(call, stream);
stream.metadata = metadata;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
stream.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
stream.on('error', function(error) {
handleError(call, error);
});
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
handler.deserialize);
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream);
}
@ -466,13 +511,10 @@ var streamHandlers = {
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
* @param {function(string, Object<string, Array<Buffer>>):
Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
* metatada for a given method
* @param {Object=} options Options that should be passed to the internal server
* implementation
*/
function Server(getMetadata, options) {
function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
@ -525,11 +567,7 @@ function Server(getMetadata, options) {
call.startBatch(batch, function() {});
return;
}
var response_metadata = {};
if (getMetadata) {
response_metadata = getMetadata(method, metadata);
}
streamHandlers[handler.type](call, handler, response_metadata);
streamHandlers[handler.type](call, handler, metadata);
}
server.requestCall(handleNewCall);
};