diff --git a/packages/grpc-native-core/src/client_interceptors.js b/packages/grpc-native-core/src/client_interceptors.js index ca9340dc..e5de547c 100644 --- a/packages/grpc-native-core/src/client_interceptors.js +++ b/packages/grpc-native-core/src/client_interceptors.js @@ -652,16 +652,22 @@ EndListener.prototype.onReceiveMessage = function(){}; EndListener.prototype.onReceiveStatus = function(){}; EndListener.prototype.recvMessageWithContext = function(){}; +var OP_DEPENDENCIES = { + [grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA], + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE], + [grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA] +}; + /** * Produces a callback triggered by streaming response messages. * @private * @param {EventEmitter} emitter * @param {grpc.internal~Call} call - * @param {grpc~Listener} listener + * @param {function} get_listener Returns a grpc~Listener. * @param {grpc~deserialize} deserialize * @return {Function} */ -function _getStreamReadCallback(emitter, call, listener, deserialize) { +function _getStreamReadCallback(emitter, call, get_listener, deserialize) { return function (err, response) { if (err) { // Something has gone wrong. Stop reading and wait for status @@ -684,6 +690,7 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) { emitter._readsDone(); return; } + var listener = get_listener(); var context = { call: call, listener: listener @@ -692,6 +699,66 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) { }; } +/** + * Tests whether a batch can be started. + * @private + * @param {number[]} batch_ops The operations in the batch we are checking. + * @param {number[]} completed_ops Previously completed operations. + * @return {boolean} + */ +function _areBatchRequirementsMet(batch_ops, completed_ops) { + var dependencies = _.flatMap(batch_ops, function(op) { + return OP_DEPENDENCIES[op] || []; + }); + var dependencies_met = _.intersection(dependencies, + batch_ops.concat(completed_ops)); + return _.isEqual(dependencies_met.sort(), dependencies.sort()); +} + +/** + * Enforces the order of operations for synchronous requests. If a batch's + * operations cannot be started because required operations have not started + * yet, the batch is deferred until requirements are met. + * @private + * @param {grpc.Client~Call} call + * @param {object} batch + * @param {object} batch_state + * @param {number[]} [batch_state.completed_ops] The ops already sent. + * @param {object} [batch_state.deferred_batches] Batches to be sent after + * their dependencies are fulfilled. + * @param {function} callback + * @return {object} + */ +function _startBatchIfReady(call, batch, batch_state, callback) { + var completed_ops = batch_state.completed_ops; + var deferred_batches = batch_state.deferred_batches; + var batch_ops = _.map(_.keys(batch), Number); + if (_areBatchRequirementsMet(batch_ops, completed_ops)) { + // Dependencies are met, start the batch and any deferred batches whose + // dependencies are met as a result. + call.startBatch(batch, callback); + completed_ops = _.union(completed_ops, batch_ops); + deferred_batches = _.flatMap(deferred_batches, function(deferred_batch) { + var deferred_batch_ops = _.map(_.keys(deferred_batch), Number); + if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) { + call.startBatch(deferred_batch.batch, deferred_batch.callback); + return []; + } + return [deferred_batch]; + }); + } else { + // Dependencies are not met, defer the batch + deferred_batches = deferred_batches.concat({ + batch: batch, + callback: callback + }); + } + return { + completed_ops: completed_ops, + deferred_batches: deferred_batches + }; +} + /** * Produces an interceptor which will start gRPC batches for unary calls. * @private @@ -708,19 +775,25 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) { var call = common.getCall(channel, method_definition.path, options); var first_listener; var final_requester = {}; + var batch_state = { + completed_ops: [], + deferred_batches: [] + }; final_requester.start = function (metadata, listener) { var batch = { [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(), }; first_listener = listener; - call.startBatch(batch, function () { }); + batch_state = _startBatchIfReady(call, batch, batch_state, + function() {}); }; final_requester.sendMessage = function (message) { var batch = { [grpc.opType.SEND_MESSAGE]: serialize(message), }; - call.startBatch(batch, function () { }); + batch_state = _startBatchIfReady(call, batch, batch_state, + function() {}); }; final_requester.halfClose = function () { var batch = { @@ -729,7 +802,7 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) { [grpc.opType.RECV_MESSAGE]: true, [grpc.opType.RECV_STATUS_ON_CLIENT]: true }; - call.startBatch(batch, function (err, response) { + var callback = function (err, response) { response.status.metadata = Metadata._fromCoreRepresentation( response.status.metadata); var status = response.status; @@ -757,7 +830,8 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) { first_listener.onReceiveMetadata(response.metadata); first_listener.onReceiveMessage(deserialized); first_listener.onReceiveStatus(status); - }); + }; + batch_state = _startBatchIfReady(call, batch, batch_state, callback); }; final_requester.cancel = function () { call.cancel(); @@ -895,17 +969,24 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) { method_definition.responseDeserialize); var serialize = method_definition.requestSerialize; return function (options) { - var first_listener; + var batch_state = { + completed_ops: [], + deferred_batches: [] + }; var call = common.getCall(channel, method_definition.path, options); var final_requester = {}; + var first_listener; + var get_listener = function() { + return first_listener; + }; final_requester.start = function(metadata, listener) { first_listener = listener; metadata = metadata.clone(); var metadata_batch = { [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(), - [grpc.opType.RECV_INITIAL_METADATA]: true, + [grpc.opType.RECV_INITIAL_METADATA]: true }; - call.startBatch(metadata_batch, function(err, response) { + var callback = function(err, response) { if (err) { // The call has stopped for some reason. A non-OK status will arrive // in the other batch. @@ -913,7 +994,9 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) { } first_listener.onReceiveMetadata( Metadata._fromCoreRepresentation(response.metadata)); - }); + }; + batch_state = _startBatchIfReady(call, metadata_batch, batch_state, + callback); var status_batch = { [grpc.opType.RECV_STATUS_ON_CLIENT]: true }; @@ -935,26 +1018,28 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) { var send_batch = { [grpc.opType.SEND_MESSAGE]: message }; - call.startBatch(send_batch, function(err, response) { + var callback = function(err, response) { if (err) { // The call has stopped for some reason. A non-OK status will arrive // in the other batch. return; } - }); + }; + batch_state = _startBatchIfReady(call, send_batch, batch_state, callback); }; final_requester.halfClose = function() { var batch = { [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true }; - call.startBatch(batch, function() {}); + batch_state = _startBatchIfReady(call, batch, batch_state, function() {}); }; final_requester.recvMessageWithContext = function(context) { var recv_batch = { [grpc.opType.RECV_MESSAGE]: true }; - call.startBatch(recv_batch, _getStreamReadCallback(emitter, call, - first_listener, deserialize)); + var callback = _getStreamReadCallback(emitter, call, + get_listener, deserialize); + batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback); }; final_requester.cancel = function() { call.cancel(); @@ -981,6 +1066,9 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) { method_definition.responseDeserialize); return function (options) { var first_listener; + var get_listener = function() { + return first_listener; + }; var call = common.getCall(channel, method_definition.path, options); var final_requester = {}; final_requester.start = function (metadata, listener) { @@ -1057,7 +1145,7 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) { [grpc.opType.RECV_MESSAGE]: true }; call.startBatch(recv_batch, _getStreamReadCallback(emitter, call, - first_listener, deserialize)); + get_listener, deserialize)); }; final_requester.cancel = function() { call.cancel(); @@ -1144,11 +1232,13 @@ function _getServerStreamingListener(method_definition, emitter) { onReceiveMessage: function(message, next, context) { if (emitter.push(message) && message !== null) { var call = context.call; - var listener = context.listener; + var get_listener = function() { + return context.listener; + }; var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(read_batch, _getStreamReadCallback(emitter, call, - listener, deserialize)); + get_listener, deserialize)); } else { emitter.reading = false; } @@ -1176,11 +1266,13 @@ function _getBidiStreamingListener(method_definition, emitter) { onReceiveMessage: function(message, next, context) { if (emitter.push(message) && message !== null) { var call = context.call; - var listener = context.listener; + var get_listener = function() { + return context.listener; + }; var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(read_batch, _getStreamReadCallback(emitter, call, - listener, deserialize)); + get_listener, deserialize)); } else { emitter.reading = false; } diff --git a/packages/grpc-native-core/test/client_interceptors_test.js b/packages/grpc-native-core/test/client_interceptors_test.js index c07811d9..b14ec156 100644 --- a/packages/grpc-native-core/test/client_interceptors_test.js +++ b/packages/grpc-native-core/test/client_interceptors_test.js @@ -1702,4 +1702,94 @@ describe('Client interceptors', function() { bidi_stream.end(); }); }); + + describe('order of operations enforced for async interceptors', function() { + it('with unary call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + client.echo(message, options, function(err, response) { + assert.strictEqual(err, null); + registry.addCall('done'); + }); + }); + it('with serverStreaming call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + var stream = client.echoServerStream(message, options); + stream.on('data', function(response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('done'); + }); + }); + }); });