diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 709d20d4..78cab5a9 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -737,9 +737,24 @@ export class Http2ServerCallStream< ) { const decoder = new StreamDecoder(); + let readsDone = false; + + let pendingMessageProcessing = false; + + let pushedEnd = false; + + const maybePushEnd = () => { + if (!pushedEnd && readsDone && !pendingMessageProcessing) { + pushedEnd = true; + this.pushOrBufferMessage(readable, null); + } + } + this.stream.on('data', async (data: Buffer) => { const messages = decoder.write(data); + pendingMessageProcessing = true; + this.stream.pause(); for (const message of messages) { if ( this.maxReceiveMessageSize !== -1 && @@ -763,10 +778,14 @@ export class Http2ServerCallStream< this.pushOrBufferMessage(readable, decompressedMessage); } + pendingMessageProcessing = false; + this.stream.resume(); + maybePushEnd(); }); this.stream.once('end', () => { - this.pushOrBufferMessage(readable, null); + readsDone = true; + maybePushEnd(); }); } @@ -810,6 +829,7 @@ export class Http2ServerCallStream< messageBytes: Buffer | null ) { if (messageBytes === null) { + trace('Received end of stream'); if (this.canPush) { readable.push(null); } else { @@ -819,6 +839,8 @@ export class Http2ServerCallStream< return; } + trace('Received message of length ' + messageBytes.length); + this.isPushPending = true; try {