diff --git a/ext/call.cc b/ext/call.cc index 49179ab3..9453000a 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -508,9 +508,14 @@ void Call::DestroyCall() { } Call::Call(grpc_call *call) - : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {} + : wrapped_call(call), pending_batches(0), has_final_op_completed(false) { + peer = grpc_call_get_peer(call); +} -Call::~Call() { DestroyCall(); } +Call::~Call() { + DestroyCall(); + gpr_free(peer); +} void Call::Init(Local exports) { HandleScope scope; @@ -662,6 +667,16 @@ NAN_METHOD(Call::StartBatch) { } Local callback_func = info[1].As(); Call *call = ObjectWrap::Unwrap(info.This()); + if (call->wrapped_call == NULL) { + /* This implies that the call has completed and has been destroyed. To + * emulate + * previous behavior, we should call the callback immediately with an error, + * as though the batch had failed in core */ + Local argv[] = { + Nan::Error("The async function failed because the call has completed")}; + Nan::Call(callback_func, Nan::New(), 1, argv); + return; + } Local obj = Nan::To(info[0]).ToLocalChecked(); Local keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked(); size_t nops = keys->Length(); @@ -727,6 +742,11 @@ NAN_METHOD(Call::Cancel) { return Nan::ThrowTypeError("cancel can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap(info.This()); + if (call->wrapped_call == NULL) { + /* Cancel is supposed to be idempotent. If the call has already finished, + * cancel should just complete silently */ + return; + } grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("cancel failed", error)); @@ -747,6 +767,11 @@ NAN_METHOD(Call::CancelWithStatus) { "cancelWithStatus's second argument must be a string"); } Call *call = ObjectWrap::Unwrap(info.This()); + if (call->wrapped_call == NULL) { + /* Cancel is supposed to be idempotent. If the call has already finished, + * cancel should just complete silently */ + return; + } grpc_status_code code = static_cast(Nan::To(info[0]).FromJust()); if (code == GRPC_STATUS_OK) { @@ -763,9 +788,7 @@ NAN_METHOD(Call::GetPeer) { return Nan::ThrowTypeError("getPeer can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap(info.This()); - char *peer = grpc_call_get_peer(call->wrapped_call); - Local peer_value = Nan::New(peer).ToLocalChecked(); - gpr_free(peer); + Local peer_value = Nan::New(call->peer).ToLocalChecked(); info.GetReturnValue().Set(peer_value); } @@ -780,6 +803,10 @@ NAN_METHOD(Call::SetCredentials) { "setCredentials' first argument must be a CallCredentials"); } Call *call = ObjectWrap::Unwrap(info.This()); + if (call->wrapped_call == NULL) { + return Nan::ThrowError( + "Cannot set credentials on a call that has already started"); + } CallCredentials *creds_object = ObjectWrap::Unwrap( Nan::To(info[0]).ToLocalChecked()); grpc_call_credentials *creds = creds_object->GetWrappedCredentials(); diff --git a/ext/call.h b/ext/call.h index 0bd24f56..8f751279 100644 --- a/ext/call.h +++ b/ext/call.h @@ -96,6 +96,7 @@ class Call : public Nan::ObjectWrap { call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this is GRPC_OP_SEND_STATUS_FROM_SERVER */ bool has_final_op_completed; + char *peer; }; class Op { diff --git a/src/client.js b/src/client.js index 16fe06a5..b79f0585 100644 --- a/src/client.js +++ b/src/client.js @@ -121,6 +121,12 @@ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; var message; + var self = this; + if (this.writeFailed) { + /* Once a write fails, just call the callback immediately to let the caller + flush any pending writes. */ + setImmediate(callback); + } try { message = this.serialize(chunk); } catch (e) { @@ -141,8 +147,10 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { - // Something has gone wrong. Stop writing by failing to call callback - return; + /* Assume that the call is complete and that writing failed because a + status was received. In that case, set a flag to discard all future + writes */ + self.writeFailed = true; } callback(); }); diff --git a/test/common_test.js b/test/common_test.js index b7c2c6a8..db80207e 100644 --- a/test/common_test.js +++ b/test/common_test.js @@ -100,7 +100,6 @@ describe('Proto message long int serialize and deserialize', function() { var longNumDeserialize = deserializeCls(messages_proto.LongValues, num_options); var serialized = longSerialize({int_64: pos_value}); - console.log(longDeserialize(serialized)); assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string'); /* With the longsAsStrings option disabled, long values are represented as * objects with 3 keys: low, high, and unsigned */ diff --git a/test/surface_test.js b/test/surface_test.js index d2f0511a..0696e7ae 100644 --- a/test/surface_test.js +++ b/test/surface_test.js @@ -1110,6 +1110,18 @@ describe('Other conditions', function() { done(); }); }); + it('after the call has fully completed', function(done) { + var peer; + var call = client.unary({error: false}, function(err, data) { + assert.ifError(err); + setImmediate(function() { + assert.strictEqual(peer, call.getPeer()); + done(); + }); + }); + peer = call.getPeer(); + assert.strictEqual(typeof peer, 'string'); + }); }); }); describe('Call propagation', function() { @@ -1352,4 +1364,17 @@ describe('Cancelling surface client', function() { }); call.cancel(); }); + it('Should be idempotent', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + // Call asynchronously to try cancelling after call is fully completed + setImmediate(function() { + assert.doesNotThrow(function() { + call.cancel(); + }); + done(); + }); + }); + call.cancel(); + }); });