mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
Merge remote-tracking branch 'upstream/v1.3.x' into master_1.3.x_upmerge
This commit is contained in:
commit
75cc7de627
37
ext/call.cc
37
ext/call.cc
@ -508,9 +508,14 @@ void Call::DestroyCall() {
|
||||
}
|
||||
|
||||
Call::Call(grpc_call *call)
|
||||
: wrapped_call(call), pending_batches(0), has_final_op_completed(false) {}
|
||||
: wrapped_call(call), pending_batches(0), has_final_op_completed(false) {
|
||||
peer = grpc_call_get_peer(call);
|
||||
}
|
||||
|
||||
Call::~Call() { DestroyCall(); }
|
||||
Call::~Call() {
|
||||
DestroyCall();
|
||||
gpr_free(peer);
|
||||
}
|
||||
|
||||
void Call::Init(Local<Object> exports) {
|
||||
HandleScope scope;
|
||||
@ -662,6 +667,16 @@ NAN_METHOD(Call::StartBatch) {
|
||||
}
|
||||
Local<Function> callback_func = info[1].As<Function>();
|
||||
Call *call = ObjectWrap::Unwrap<Call>(info.This());
|
||||
if (call->wrapped_call == NULL) {
|
||||
/* This implies that the call has completed and has been destroyed. To
|
||||
* emulate
|
||||
* previous behavior, we should call the callback immediately with an error,
|
||||
* as though the batch had failed in core */
|
||||
Local<Value> argv[] = {
|
||||
Nan::Error("The async function failed because the call has completed")};
|
||||
Nan::Call(callback_func, Nan::New<Object>(), 1, argv);
|
||||
return;
|
||||
}
|
||||
Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked();
|
||||
Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked();
|
||||
size_t nops = keys->Length();
|
||||
@ -727,6 +742,11 @@ NAN_METHOD(Call::Cancel) {
|
||||
return Nan::ThrowTypeError("cancel can only be called on Call objects");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(info.This());
|
||||
if (call->wrapped_call == NULL) {
|
||||
/* Cancel is supposed to be idempotent. If the call has already finished,
|
||||
* cancel should just complete silently */
|
||||
return;
|
||||
}
|
||||
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
|
||||
if (error != GRPC_CALL_OK) {
|
||||
return Nan::ThrowError(nanErrorWithCode("cancel failed", error));
|
||||
@ -747,6 +767,11 @@ NAN_METHOD(Call::CancelWithStatus) {
|
||||
"cancelWithStatus's second argument must be a string");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(info.This());
|
||||
if (call->wrapped_call == NULL) {
|
||||
/* Cancel is supposed to be idempotent. If the call has already finished,
|
||||
* cancel should just complete silently */
|
||||
return;
|
||||
}
|
||||
grpc_status_code code =
|
||||
static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust());
|
||||
if (code == GRPC_STATUS_OK) {
|
||||
@ -763,9 +788,7 @@ NAN_METHOD(Call::GetPeer) {
|
||||
return Nan::ThrowTypeError("getPeer can only be called on Call objects");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(info.This());
|
||||
char *peer = grpc_call_get_peer(call->wrapped_call);
|
||||
Local<Value> peer_value = Nan::New(peer).ToLocalChecked();
|
||||
gpr_free(peer);
|
||||
Local<Value> peer_value = Nan::New(call->peer).ToLocalChecked();
|
||||
info.GetReturnValue().Set(peer_value);
|
||||
}
|
||||
|
||||
@ -780,6 +803,10 @@ NAN_METHOD(Call::SetCredentials) {
|
||||
"setCredentials' first argument must be a CallCredentials");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(info.This());
|
||||
if (call->wrapped_call == NULL) {
|
||||
return Nan::ThrowError(
|
||||
"Cannot set credentials on a call that has already started");
|
||||
}
|
||||
CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>(
|
||||
Nan::To<Object>(info[0]).ToLocalChecked());
|
||||
grpc_call_credentials *creds = creds_object->GetWrappedCredentials();
|
||||
|
||||
@ -96,6 +96,7 @@ class Call : public Nan::ObjectWrap {
|
||||
call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
|
||||
is GRPC_OP_SEND_STATUS_FROM_SERVER */
|
||||
bool has_final_op_completed;
|
||||
char *peer;
|
||||
};
|
||||
|
||||
class Op {
|
||||
|
||||
@ -121,6 +121,12 @@ function _write(chunk, encoding, callback) {
|
||||
/* jshint validthis: true */
|
||||
var batch = {};
|
||||
var message;
|
||||
var self = this;
|
||||
if (this.writeFailed) {
|
||||
/* Once a write fails, just call the callback immediately to let the caller
|
||||
flush any pending writes. */
|
||||
setImmediate(callback);
|
||||
}
|
||||
try {
|
||||
message = this.serialize(chunk);
|
||||
} catch (e) {
|
||||
@ -141,8 +147,10 @@ function _write(chunk, encoding, callback) {
|
||||
batch[grpc.opType.SEND_MESSAGE] = message;
|
||||
this.call.startBatch(batch, function(err, event) {
|
||||
if (err) {
|
||||
// Something has gone wrong. Stop writing by failing to call callback
|
||||
return;
|
||||
/* Assume that the call is complete and that writing failed because a
|
||||
status was received. In that case, set a flag to discard all future
|
||||
writes */
|
||||
self.writeFailed = true;
|
||||
}
|
||||
callback();
|
||||
});
|
||||
|
||||
@ -100,7 +100,6 @@ describe('Proto message long int serialize and deserialize', function() {
|
||||
var longNumDeserialize = deserializeCls(messages_proto.LongValues,
|
||||
num_options);
|
||||
var serialized = longSerialize({int_64: pos_value});
|
||||
console.log(longDeserialize(serialized));
|
||||
assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string');
|
||||
/* With the longsAsStrings option disabled, long values are represented as
|
||||
* objects with 3 keys: low, high, and unsigned */
|
||||
|
||||
@ -1110,6 +1110,18 @@ describe('Other conditions', function() {
|
||||
done();
|
||||
});
|
||||
});
|
||||
it('after the call has fully completed', function(done) {
|
||||
var peer;
|
||||
var call = client.unary({error: false}, function(err, data) {
|
||||
assert.ifError(err);
|
||||
setImmediate(function() {
|
||||
assert.strictEqual(peer, call.getPeer());
|
||||
done();
|
||||
});
|
||||
});
|
||||
peer = call.getPeer();
|
||||
assert.strictEqual(typeof peer, 'string');
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('Call propagation', function() {
|
||||
@ -1352,4 +1364,17 @@ describe('Cancelling surface client', function() {
|
||||
});
|
||||
call.cancel();
|
||||
});
|
||||
it('Should be idempotent', function(done) {
|
||||
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
|
||||
assert.strictEqual(err.code, surface_client.status.CANCELLED);
|
||||
// Call asynchronously to try cancelling after call is fully completed
|
||||
setImmediate(function() {
|
||||
assert.doesNotThrow(function() {
|
||||
call.cancel();
|
||||
});
|
||||
done();
|
||||
});
|
||||
});
|
||||
call.cancel();
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user