mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
Merge branch 'new_invoke_api' of github.com:google/grpc into update-api
This commit is contained in:
commit
00993de85a
31
call.cc
31
call.cc
@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
|
||||
tpl->InstanceTemplate()->SetInternalFieldCount(1);
|
||||
NanSetPrototypeTemplate(tpl, "addMetadata",
|
||||
FunctionTemplate::New(AddMetadata)->GetFunction());
|
||||
NanSetPrototypeTemplate(tpl, "startInvoke",
|
||||
FunctionTemplate::New(StartInvoke)->GetFunction());
|
||||
NanSetPrototypeTemplate(tpl, "invoke",
|
||||
FunctionTemplate::New(Invoke)->GetFunction());
|
||||
NanSetPrototypeTemplate(tpl, "serverAccept",
|
||||
FunctionTemplate::New(ServerAccept)->GetFunction());
|
||||
NanSetPrototypeTemplate(
|
||||
@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
|
||||
NanReturnUndefined();
|
||||
}
|
||||
|
||||
NAN_METHOD(Call::StartInvoke) {
|
||||
NAN_METHOD(Call::Invoke) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError("startInvoke can only be called on Call objects");
|
||||
return NanThrowTypeError("invoke can only be called on Call objects");
|
||||
}
|
||||
if (!args[0]->IsFunction()) {
|
||||
return NanThrowTypeError("StartInvoke's first argument must be a function");
|
||||
return NanThrowTypeError("invoke's first argument must be a function");
|
||||
}
|
||||
if (!args[1]->IsFunction()) {
|
||||
return NanThrowTypeError(
|
||||
"StartInvoke's second argument must be a function");
|
||||
return NanThrowTypeError("invoke's second argument must be a function");
|
||||
}
|
||||
if (!args[2]->IsFunction()) {
|
||||
return NanThrowTypeError("StartInvoke's third argument must be a function");
|
||||
}
|
||||
if (!args[3]->IsUint32()) {
|
||||
return NanThrowTypeError(
|
||||
"StartInvoke's fourth argument must be integer flags");
|
||||
if (!args[2]->IsUint32()) {
|
||||
return NanThrowTypeError("invoke's third argument must be integer flags");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(args.This());
|
||||
unsigned int flags = args[3]->Uint32Value();
|
||||
grpc_call_error error = grpc_call_start_invoke(
|
||||
grpc_call_error error = grpc_call_invoke(
|
||||
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
|
||||
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
|
||||
CreateTag(args[2], args.This()), flags);
|
||||
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
|
||||
if (error == GRPC_CALL_OK) {
|
||||
CompletionQueueAsyncWorker::Next();
|
||||
CompletionQueueAsyncWorker::Next();
|
||||
CompletionQueueAsyncWorker::Next();
|
||||
} else {
|
||||
return NanThrowError("startInvoke failed", error);
|
||||
return NanThrowError("invoke failed", error);
|
||||
}
|
||||
NanReturnUndefined();
|
||||
}
|
||||
@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
|
||||
NAN_METHOD(Call::Cancel) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError("startInvoke can only be called on Call objects");
|
||||
return NanThrowTypeError("cancel can only be called on Call objects");
|
||||
}
|
||||
Call *call = ObjectWrap::Unwrap<Call>(args.This());
|
||||
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
|
||||
|
||||
2
call.h
2
call.h
@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
|
||||
|
||||
static NAN_METHOD(New);
|
||||
static NAN_METHOD(AddMetadata);
|
||||
static NAN_METHOD(StartInvoke);
|
||||
static NAN_METHOD(Invoke);
|
||||
static NAN_METHOD(ServerAccept);
|
||||
static NAN_METHOD(ServerEndInitialMetadata);
|
||||
static NAN_METHOD(Cancel);
|
||||
|
||||
99
client.js
99
client.js
@ -50,101 +50,53 @@ util.inherits(GrpcClientStream, Duplex);
|
||||
function GrpcClientStream(call, options) {
|
||||
Duplex.call(this, options);
|
||||
var self = this;
|
||||
// Indicates that we can start reading and have not received a null read
|
||||
var can_read = false;
|
||||
var finished = false;
|
||||
// Indicates that a read is currently pending
|
||||
var reading = false;
|
||||
// Indicates that we can call startWrite
|
||||
var can_write = false;
|
||||
// Indicates that a write is currently pending
|
||||
var writing = false;
|
||||
this._call = call;
|
||||
/**
|
||||
* Callback to handle receiving a READ event. Pushes the data from that event
|
||||
* onto the read queue and starts reading again if applicable.
|
||||
* @param {grpc.Event} event The READ event object
|
||||
* Callback to be called when a READ event is received. Pushes the data onto
|
||||
* the read queue and starts reading again if applicable
|
||||
* @param {grpc.Event} event READ event object
|
||||
*/
|
||||
function readCallback(event) {
|
||||
if (finished) {
|
||||
self.push(null);
|
||||
return;
|
||||
}
|
||||
var data = event.data;
|
||||
if (self.push(data)) {
|
||||
if (data == null) {
|
||||
// Disable starting to read after null read was received
|
||||
can_read = false;
|
||||
reading = false;
|
||||
} else {
|
||||
call.startRead(readCallback);
|
||||
}
|
||||
if (self.push(data) && data != null) {
|
||||
self._call.startRead(readCallback);
|
||||
} else {
|
||||
// Indicate that reading can be resumed by calling startReading
|
||||
reading = false;
|
||||
}
|
||||
};
|
||||
/**
|
||||
* Initiate a read, which continues until self.push returns false (indicating
|
||||
* that reading should be paused) or data is null (indicating that there is no
|
||||
* more data to read).
|
||||
*/
|
||||
function startReading() {
|
||||
call.startRead(readCallback);
|
||||
}
|
||||
// TODO(mlumish): possibly change queue implementation due to shift slowness
|
||||
var write_queue = [];
|
||||
/**
|
||||
* Write the next chunk of data in the write queue if there is one. Otherwise
|
||||
* indicate that there is no pending write. When the write succeeds, this
|
||||
* function is called again.
|
||||
*/
|
||||
function writeNext() {
|
||||
if (write_queue.length > 0) {
|
||||
writing = true;
|
||||
var next = write_queue.shift();
|
||||
var writeCallback = function(event) {
|
||||
next.callback();
|
||||
writeNext();
|
||||
};
|
||||
call.startWrite(next.chunk, writeCallback, 0);
|
||||
} else {
|
||||
writing = false;
|
||||
}
|
||||
}
|
||||
call.startInvoke(function(event) {
|
||||
can_read = true;
|
||||
can_write = true;
|
||||
startReading();
|
||||
writeNext();
|
||||
}, function(event) {
|
||||
call.invoke(function(event) {
|
||||
self.emit('metadata', event.data);
|
||||
}, function(event) {
|
||||
finished = true;
|
||||
self.emit('status', event.data);
|
||||
}, 0);
|
||||
this.on('finish', function() {
|
||||
call.writesDone(function() {});
|
||||
});
|
||||
/**
|
||||
* Indicate that reads should start, and start them if the INVOKE_ACCEPTED
|
||||
* event has been received.
|
||||
* Start reading if there is not already a pending read. Reading will
|
||||
* continue until self.push returns false (indicating reads should slow
|
||||
* down) or the read data is null (indicating that there is no more data).
|
||||
*/
|
||||
this._enableRead = function() {
|
||||
if (!reading) {
|
||||
reading = true;
|
||||
if (can_read) {
|
||||
startReading();
|
||||
this.startReading = function() {
|
||||
if (finished) {
|
||||
self.push(null);
|
||||
} else {
|
||||
if (!reading) {
|
||||
reading = true;
|
||||
self._call.startRead(readCallback);
|
||||
}
|
||||
}
|
||||
};
|
||||
/**
|
||||
* Push the chunk onto the write queue, and write from the write queue if
|
||||
* there is not a pending write
|
||||
* @param {Buffer} chunk The chunk of data to write
|
||||
* @param {function(Error=)} callback The callback to call when the write
|
||||
* completes
|
||||
*/
|
||||
this._tryWrite = function(chunk, callback) {
|
||||
write_queue.push({chunk: chunk, callback: callback});
|
||||
if (can_write && !writing) {
|
||||
writeNext();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,7 +105,7 @@ function GrpcClientStream(call, options) {
|
||||
* @param {number} size Ignored
|
||||
*/
|
||||
GrpcClientStream.prototype._read = function(size) {
|
||||
this._enableRead();
|
||||
this.startReading();
|
||||
};
|
||||
|
||||
/**
|
||||
@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) {
|
||||
* @param {function(Error=)} callback Ignored
|
||||
*/
|
||||
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
|
||||
this._tryWrite(chunk, callback);
|
||||
var self = this;
|
||||
self._call.startWrite(chunk, function(event) {
|
||||
callback();
|
||||
}, 0);
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
|
||||
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
|
||||
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
|
||||
completion_type->Set(NanNew("READ"), READ);
|
||||
Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
|
||||
completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
|
||||
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
|
||||
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
|
||||
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
|
||||
|
||||
@ -118,12 +118,11 @@ describe('call', function() {
|
||||
call.addMetadata(5);
|
||||
}, TypeError);
|
||||
});
|
||||
it('should fail if startInvoke was already called', function(done) {
|
||||
it('should fail if invoke was already called', function(done) {
|
||||
var call = new grpc.Call(channel, 'method', getDeadline(1));
|
||||
call.startInvoke(function() {},
|
||||
function() {},
|
||||
function() {done();},
|
||||
0);
|
||||
call.invoke(function() {},
|
||||
function() {done();},
|
||||
0);
|
||||
assert.throws(function() {
|
||||
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
|
||||
}, function(err) {
|
||||
@ -133,32 +132,26 @@ describe('call', function() {
|
||||
call.cancel();
|
||||
});
|
||||
});
|
||||
describe('startInvoke', function() {
|
||||
it('should fail with fewer than 4 arguments', function() {
|
||||
describe('invoke', function() {
|
||||
it('should fail with fewer than 3 arguments', function() {
|
||||
var call = new grpc.Call(channel, 'method', getDeadline(1));
|
||||
assert.throws(function() {
|
||||
call.startInvoke();
|
||||
call.invoke();
|
||||
}, TypeError);
|
||||
assert.throws(function() {
|
||||
call.startInvoke(function() {});
|
||||
call.invoke(function() {});
|
||||
}, TypeError);
|
||||
assert.throws(function() {
|
||||
call.startInvoke(function() {},
|
||||
function() {});
|
||||
}, TypeError);
|
||||
assert.throws(function() {
|
||||
call.startInvoke(function() {},
|
||||
function() {},
|
||||
function() {});
|
||||
call.invoke(function() {},
|
||||
function() {});
|
||||
}, TypeError);
|
||||
});
|
||||
it('should work with 3 args and an int', function(done) {
|
||||
it('should work with 2 args and an int', function(done) {
|
||||
assert.doesNotThrow(function() {
|
||||
var call = new grpc.Call(channel, 'method', getDeadline(1));
|
||||
call.startInvoke(function() {},
|
||||
function() {},
|
||||
function() {done();},
|
||||
0);
|
||||
call.invoke(function() {},
|
||||
function() {done();},
|
||||
0);
|
||||
// Cancel to speed up the test
|
||||
call.cancel();
|
||||
});
|
||||
@ -166,12 +159,11 @@ describe('call', function() {
|
||||
it('should reject incorrectly typed arguments', function() {
|
||||
var call = new grpc.Call(channel, 'method', getDeadline(1));
|
||||
assert.throws(function() {
|
||||
call.startInvoke(0, 0, 0, 0);
|
||||
call.invoke(0, 0, 0);
|
||||
}, TypeError);
|
||||
assert.throws(function() {
|
||||
call.startInvoke(function() {},
|
||||
function() {},
|
||||
function() {}, 'test');
|
||||
call.invoke(function() {},
|
||||
function() {}, 'test');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -94,7 +94,6 @@ var opErrorNames = [
|
||||
var completionTypeNames = [
|
||||
'QUEUE_SHUTDOWN',
|
||||
'READ',
|
||||
'INVOKE_ACCEPTED',
|
||||
'WRITE_ACCEPTED',
|
||||
'FINISH_ACCEPTED',
|
||||
'CLIENT_METADATA_READ',
|
||||
|
||||
@ -72,16 +72,7 @@ describe('end-to-end', function() {
|
||||
var call = new grpc.Call(channel,
|
||||
'dummy_method',
|
||||
deadline);
|
||||
call.startInvoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.INVOKE_ACCEPTED);
|
||||
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
});
|
||||
},function(event) {
|
||||
call.invoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.CLIENT_METADATA_READ);
|
||||
},function(event) {
|
||||
@ -91,6 +82,11 @@ describe('end-to-end', function() {
|
||||
assert.strictEqual(status.details, status_text);
|
||||
done();
|
||||
}, 0);
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
});
|
||||
|
||||
server.start();
|
||||
server.requestCall(function(event) {
|
||||
@ -131,28 +127,7 @@ describe('end-to-end', function() {
|
||||
var call = new grpc.Call(channel,
|
||||
'dummy_method',
|
||||
deadline);
|
||||
call.startInvoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.INVOKE_ACCEPTED);
|
||||
call.startWrite(
|
||||
new Buffer(req_text),
|
||||
function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.WRITE_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
done();
|
||||
});
|
||||
}, 0);
|
||||
call.startRead(function(event) {
|
||||
assert.strictEqual(event.type, grpc.completionType.READ);
|
||||
assert.strictEqual(event.data.toString(), reply_text);
|
||||
done();
|
||||
});
|
||||
},function(event) {
|
||||
call.invoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.CLIENT_METADATA_READ);
|
||||
done();
|
||||
@ -163,6 +138,24 @@ describe('end-to-end', function() {
|
||||
assert.strictEqual(status.details, status_text);
|
||||
done();
|
||||
}, 0);
|
||||
call.startWrite(
|
||||
new Buffer(req_text),
|
||||
function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.WRITE_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
done();
|
||||
});
|
||||
}, 0);
|
||||
call.startRead(function(event) {
|
||||
assert.strictEqual(event.type, grpc.completionType.READ);
|
||||
assert.strictEqual(event.data.toString(), reply_text);
|
||||
done();
|
||||
});
|
||||
|
||||
server.start();
|
||||
server.requestCall(function(event) {
|
||||
|
||||
@ -83,28 +83,7 @@ describe('echo server', function() {
|
||||
var call = new grpc.Call(channel,
|
||||
'echo',
|
||||
deadline);
|
||||
call.startInvoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.INVOKE_ACCEPTED);
|
||||
call.startWrite(
|
||||
new Buffer(req_text),
|
||||
function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.WRITE_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
done();
|
||||
});
|
||||
}, 0);
|
||||
call.startRead(function(event) {
|
||||
assert.strictEqual(event.type, grpc.completionType.READ);
|
||||
assert.strictEqual(event.data.toString(), req_text);
|
||||
done();
|
||||
});
|
||||
},function(event) {
|
||||
call.invoke(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.CLIENT_METADATA_READ);
|
||||
done();
|
||||
@ -116,6 +95,24 @@ describe('echo server', function() {
|
||||
server.shutdown();
|
||||
done();
|
||||
}, 0);
|
||||
call.startWrite(
|
||||
new Buffer(req_text),
|
||||
function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.WRITE_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
call.writesDone(function(event) {
|
||||
assert.strictEqual(event.type,
|
||||
grpc.completionType.FINISH_ACCEPTED);
|
||||
assert.strictEqual(event.data, grpc.opError.OK);
|
||||
done();
|
||||
});
|
||||
}, 0);
|
||||
call.startRead(function(event) {
|
||||
assert.strictEqual(event.type, grpc.completionType.READ);
|
||||
assert.strictEqual(event.data.toString(), req_text);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user