From 1aa11525fdaa4add2555f27a484cbc9558b324e9 Mon Sep 17 00:00:00 2001 From: cjihrig Date: Thu, 16 May 2019 11:23:54 -0400 Subject: [PATCH] grpc-js: add bidirectional streaming RPC support This commit adds bidi streaming RPC support to the server. --- packages/grpc-js/src/server-call.ts | 20 +++++- packages/grpc-js/src/server.ts | 9 ++- packages/grpc-js/test/test-server-errors.ts | 72 +++++++++++++++++++++ packages/grpc-js/test/test-server.ts | 16 +++++ 4 files changed, 114 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index b2da85c8..8ecb91d2 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -204,13 +204,21 @@ export class ServerWritableStreamImpl extends export class ServerDuplexStreamImpl extends Duplex implements ServerDuplexStream { cancelled: boolean; + private trailingMetadata: Metadata; constructor( private call: Http2ServerCallStream, - public metadata: Metadata, private _serialize: Serialize, - private _deserialize: Deserialize) { + public metadata: Metadata, public serialize: Serialize, + public deserialize: Deserialize) { super({objectMode: true}); this.cancelled = false; + this.trailingMetadata = new Metadata(); + this.call.setupReadable(this); + + this.on('error', (err) => { + this.call.sendError(err as ServiceError); + this.end(); + }); } getPeer(): string { @@ -222,6 +230,14 @@ export class ServerDuplexStreamImpl extends Duplex } } +ServerDuplexStreamImpl.prototype._read = + ServerReadableStreamImpl.prototype._read; +ServerDuplexStreamImpl.prototype._write = + ServerWritableStreamImpl.prototype._write; +ServerDuplexStreamImpl.prototype._final = + ServerWritableStreamImpl.prototype._final; +ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end; + // Unary response callback signature. export type sendUnaryData = diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 68f6e149..8331fe1f 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -355,5 +355,12 @@ function handleBidiStreaming( call: Http2ServerCallStream, handler: BidiStreamingHandler, metadata: Metadata): void { - throw new Error('not implemented yet'); + const stream = new ServerDuplexStreamImpl( + call, metadata, handler.serialize, handler.deserialize); + + if (call.cancelled) { + return; + } + + handler.func(stream); } diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 6f352f73..76e63a6a 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -147,6 +147,20 @@ describe('Client malformed response handling', () => { done(); }); }); + + it('should get an INTERNAL status with a bidi stream call', (done) => { + const call = client.bidiStream(); + + call.on('data', noop); + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write({}); + call.end(); + }); }); describe('Server serialization failure handling', () => { @@ -444,6 +458,23 @@ describe('Other conditions', () => { done(); }); }); + + it('should respond correctly to a bidi stream', (done) => { + const call = misbehavingClient.bidiStream(); + + call.on('data', (data: any) => { + assert.fail(data); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write(badArg); + call.end(); + }); }); describe('Trailing metadata', () => { @@ -561,6 +592,33 @@ describe('Other conditions', () => { done(); }); }); + + it('should be present when a bidi stream succeeds', (done) => { + const call = client.bidiStream(); + + call.write({error: false}); + call.write({error: false}); + call.end(); + call.on('data', noop); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + done(); + }); + }); + + it('should be present when a bidi stream fails', (done) => { + const call = client.bidiStream(); + + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('data', noop); + call.on('error', (error: ServiceError) => { + assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']); + done(); + }); + }); }); describe('Error object should contain the status', () => { @@ -597,6 +655,20 @@ describe('Other conditions', () => { }); }); + it('for a bidi stream call', (done) => { + const call = client.bidiStream(); + + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('data', noop); + call.on('error', (error: ServiceError) => { + assert.strictEqual(error.code, grpc.status.UNKNOWN); + assert.strictEqual(error.details, 'Requested error'); + done(); + }); + }); + it('for a UTF-8 error message', (done) => { client.unary( {error: true, message: '測試字符串'}, diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index d310ff07..6bb7c1ff 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -286,6 +286,22 @@ describe('Server', () => { done(); }); }); + + it('should respond to a bidi call with UNIMPLEMENTED', (done) => { + const call = client.divMany(); + + call.on('data', (value: any) => { + assert.fail('No messages expected'); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + + call.end(); + }); }); });