From 40c2f61eba518970210ec857cb067f069b907bf2 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 8 Dec 2021 10:27:21 -0500 Subject: [PATCH] Fix server decompression sequencing, add tracing --- packages/grpc-js/src/server-call.ts | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 709d20d4..78cab5a9 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -737,9 +737,24 @@ export class Http2ServerCallStream< ) { const decoder = new StreamDecoder(); + let readsDone = false; + + let pendingMessageProcessing = false; + + let pushedEnd = false; + + const maybePushEnd = () => { + if (!pushedEnd && readsDone && !pendingMessageProcessing) { + pushedEnd = true; + this.pushOrBufferMessage(readable, null); + } + } + this.stream.on('data', async (data: Buffer) => { const messages = decoder.write(data); + pendingMessageProcessing = true; + this.stream.pause(); for (const message of messages) { if ( this.maxReceiveMessageSize !== -1 && @@ -763,10 +778,14 @@ export class Http2ServerCallStream< this.pushOrBufferMessage(readable, decompressedMessage); } + pendingMessageProcessing = false; + this.stream.resume(); + maybePushEnd(); }); this.stream.once('end', () => { - this.pushOrBufferMessage(readable, null); + readsDone = true; + maybePushEnd(); }); } @@ -810,6 +829,7 @@ export class Http2ServerCallStream< messageBytes: Buffer | null ) { if (messageBytes === null) { + trace('Received end of stream'); if (this.canPush) { readable.push(null); } else { @@ -819,6 +839,8 @@ export class Http2ServerCallStream< return; } + trace('Received message of length ' + messageBytes.length); + this.isPushPending = true; try {