Merge pull request #3032 from murgatroid99/grpc-js_retry_half_close_1.14

Backport "Send halfClose immediately after messages to prevent late halfClose issues with Envoy" to 1.14.x
This commit is contained in:
Michael Lumish 2025-12-11 07:13:27 -08:00 committed by GitHub
commit ccd29b27d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 69 additions and 18 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.14.2",
"version": "1.14.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -760,11 +760,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
this.maybeStartHedgingTimer();
}
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
private handleChildWriteCompleted(childIndex: number, messageIndex: number) {
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
const childCall = this.underlyingCalls[childIndex];
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
@ -774,19 +773,33 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
if (childCall.state === 'COMPLETED') {
return;
}
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
const messageIndex = childCall.nextMessageToSend;
if (this.getBufferEntry(messageIndex)) {
const bufferEntry = this.getBufferEntry(messageIndex);
switch (bufferEntry.entryType) {
case 'MESSAGE':
childCall.call.sendMessageWithContext(
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(childIndex);
this.handleChildWriteCompleted(childIndex, messageIndex);
},
},
bufferEntry.message!.message
);
// Optimization: if the next entry is HALF_CLOSE, send it immediately
// without waiting for the message callback. This is safe because the message
// has already been passed to the underlying transport.
const nextEntry = this.getBufferEntry(messageIndex + 1);
if (nextEntry.entryType === 'HALF_CLOSE') {
this.trace(
'Sending halfClose immediately after message to child [' +
childCall.call.getCallNumber() +
'] - optimizing for unary/final message'
);
childCall.nextMessageToSend += 1;
childCall.call.halfClose();
}
break;
case 'HALF_CLOSE':
childCall.nextMessageToSend += 1;
@ -813,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
};
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
// Run this in next tick to avoid suspending the current execution context
// otherwise it might cause half closing the call before sending message
process.nextTick(() => {
context.callback?.();
});
for (const [callIndex, call] of this.underlyingCalls.entries()) {
if (
call.state === 'ACTIVE' &&
@ -823,7 +840,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(callIndex);
this.handleChildWriteCompleted(callIndex, messageIndex);
},
},
message
@ -843,7 +860,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(this.committedCallIndex!);
this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex);
},
},
message
@ -868,12 +885,21 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
allocated: false,
});
for (const call of this.underlyingCalls) {
if (
call?.state === 'ACTIVE' &&
call.nextMessageToSend === halfCloseIndex
) {
call.nextMessageToSend += 1;
call.call.halfClose();
if (call?.state === 'ACTIVE') {
// Send halfClose to call when either:
// - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization)
// - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged
if (call.nextMessageToSend === halfCloseIndex
|| call.nextMessageToSend === halfCloseIndex - 1) {
this.trace(
'Sending halfClose immediately to child [' +
call.call.getCallNumber() +
'] - all messages already sent'
);
call.nextMessageToSend += 1;
call.call.halfClose();
}
// Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete
}
}
}
@ -895,4 +921,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
return null;
}
}
}
}

View File

@ -18,7 +18,7 @@
import * as assert from 'assert';
import * as path from 'path';
import { loadProtoFile } from './common';
import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src';
import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src';
import { ServiceClient } from '../src/make-client';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
@ -36,6 +36,15 @@ const echoServiceImplementation = {
call.end();
});
},
echoClientStream(call: ServerReadableStream<any, any>, callback: sendUnaryData<any>) {
const messages: any[] = [];
call.on('data', (message: any) => {
messages.push(message);
});
call.on('end', () => {
callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length });
});
},
};
describe('Client should successfully communicate with server', () => {
@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => {
});
});
}).timeout(5000);
it('Client streaming with one message should work', done => {
server = new Server();
server.addService(EchoService.service, echoServiceImplementation);
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new EchoService(`localhost:${port}`, credentials.createInsecure());
const call = client.echoClientStream((error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 1 });
done();
});
call.write({ value: 'test value', value2: 42 });
call.end();
});
});
});