From b3b6310f041aef2770f4a7945453e4db2b5cd113 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 10 Jan 2023 15:24:22 -0800 Subject: [PATCH] grpc-js: Don't end calls when receiving GOAWAY --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/transport.ts | 38 ++++++++++++---- packages/grpc-js/test/test-server.ts | 67 +++++++++++++++++++++++----- 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index c11b4dc4..700c7b77 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.2", + "version": "1.8.3", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index e713ed6e..64f77094 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -161,7 +161,7 @@ class Http2Transport implements Transport { session.once('close', () => { this.trace('session closed'); this.stopKeepalivePings(); - this.handleDisconnect(false); + this.handleDisconnect(); }); session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { let tooManyPings = false; @@ -177,7 +177,7 @@ class Http2Transport implements Transport { 'connection closed by GOAWAY with code ' + errorCode ); - this.handleDisconnect(tooManyPings); + this.reportDisconnectToOwner(tooManyPings); }); session.once('error', error => { /* Do nothing here. Any error should also trigger a close event, which is @@ -263,15 +263,35 @@ class Http2Transport implements Transport { logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } - private handleDisconnect(tooManyPings: boolean) { + /** + * Indicate to the owner of this object that this transport should no longer + * be used. That happens if the connection drops, or if the server sends a + * GOAWAY. + * @param tooManyPings If true, this was triggered by a GOAWAY with data + * indicating that the session was closed becaues the client sent too many + * pings. + * @returns + */ + private reportDisconnectToOwner(tooManyPings: boolean) { if (this.disconnectHandled) { return; } this.disconnectHandled = true; this.disconnectListeners.forEach(listener => listener(tooManyPings)); - for (const call of this.activeCalls) { - call.onDisconnect(); - } + } + + /** + * Handle connection drops, but not GOAWAYs. + */ + private handleDisconnect() { + this.reportDisconnectToOwner(false); + /* Give calls an event loop cycle to finish naturally before reporting the + * disconnnection to them. */ + setImmediate(() => { + for (const call of this.activeCalls) { + call.onDisconnect(); + } + }); } addDisconnectListener(listener: TransportDisconnectListener): void { @@ -294,7 +314,7 @@ class Http2Transport implements Transport { if (!this.keepaliveTimeoutId) { this.keepaliveTimeoutId = setTimeout(() => { this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(false); + this.handleDisconnect(); }, this.keepaliveTimeoutMs); this.keepaliveTimeoutId.unref?.(); } @@ -308,7 +328,7 @@ class Http2Transport implements Transport { } catch (e) { /* If we fail to send a ping, the connection is no longer functional, so * we should discard it. */ - this.handleDisconnect(false); + this.handleDisconnect(); } } @@ -365,7 +385,7 @@ class Http2Transport implements Transport { try { http2Stream = this.session!.request(headers); } catch (e) { - this.handleDisconnect(false); + this.handleDisconnect(); throw e; } this.flowControlTrace( diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 0c0ba168..c67ebc4d 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -27,9 +27,9 @@ import * as grpc from '../src'; import { Server, ServerCredentials } from '../src'; import { ServiceError } from '../src/call'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; -import { sendUnaryData, ServerUnaryCall } from '../src/server-call'; +import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call'; -import { loadProtoFile } from './common'; +import { assert2, loadProtoFile } from './common'; import { TestServiceClient, TestServiceHandlers } from './generated/TestService'; import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service'; import { Request__Output } from './generated/Request'; @@ -458,18 +458,28 @@ describe('Server', () => { describe('Echo service', () => { let server: Server; let client: ServiceClient; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + echoBidiStream(call: ServerDuplexStream) { + call.on('data', data => { + call.write(data); + }); + call.on('end', () => { + call.end(); + }); + } + }; before(done => { - const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); - const echoService = loadProtoFile(protoFile) - .EchoService as ServiceClientConstructor; server = new Server(); - server.addService(echoService.service, { - echo(call: ServerUnaryCall, callback: sendUnaryData) { - callback(null, call.request); - }, - }); + server.addService(echoService.service, serviceImplementation); server.bindAsync( 'localhost:0', @@ -501,6 +511,43 @@ describe('Echo service', () => { } ); }); + + /* This test passes on Node 18 but fails on Node 16. The failure appears to + * be caused by https://github.com/nodejs/node/issues/42713 */ + it.skip('should continue a stream after server shutdown', done => { + const server2 = new Server(); + server2.addService(echoService.service, serviceImplementation); + server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { + if (err) { + done(err); + return; + } + const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure()); + server2.start(); + const stream = client2.echoBidiStream(); + const totalMessages = 5; + let messagesSent = 0; + stream.write({ value: 'test value', value2: messagesSent}); + messagesSent += 1; + stream.on('data', () => { + if (messagesSent === 1) { + server2.tryShutdown(assert2.mustCall(() => {})); + } + if (messagesSent >= totalMessages) { + stream.end(); + } else { + stream.write({ value: 'test value', value2: messagesSent}); + messagesSent += 1; + } + }); + stream.on('status', assert2.mustCall((status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(messagesSent, totalMessages); + })); + stream.on('error', () => {}); + assert2.afterMustCallsSatisfied(done); + }); + }); }); describe('Generic client and server', () => {