mirror of
https://github.com/grpc/grpc-node.git
synced 2026-01-25 14:27:06 +00:00
remove halfCloseSent field
This commit is contained in:
parent
f6895cbf40
commit
88a083d4bc
@ -128,10 +128,6 @@ interface UnderlyingCall {
|
||||
* This is different from nextMessageToSend which tracks completion/acknowledgment.
|
||||
*/
|
||||
highestSentMessageIndex: number;
|
||||
/**
|
||||
* Tracks whether halfClose has been sent to this child call.
|
||||
*/
|
||||
halfCloseSent: boolean;
|
||||
startTime: Date;
|
||||
}
|
||||
|
||||
@ -705,7 +701,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
call: child,
|
||||
nextMessageToSend: 0,
|
||||
highestSentMessageIndex: -1,
|
||||
halfCloseSent: false,
|
||||
startTime: new Date(),
|
||||
});
|
||||
const previousAttempts = this.attempts - 1;
|
||||
@ -771,11 +766,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
this.maybeStartHedgingTimer();
|
||||
}
|
||||
|
||||
private handleChildWriteCompleted(childIndex: number) {
|
||||
const childCall = this.underlyingCalls[childIndex];
|
||||
const messageIndex = childCall.nextMessageToSend;
|
||||
private handleChildWriteCompleted(childIndex: number, messageIndex: number) {
|
||||
this.getBufferEntry(messageIndex).callback?.();
|
||||
this.clearSentMessages();
|
||||
const childCall = this.underlyingCalls[childIndex];
|
||||
childCall.nextMessageToSend += 1;
|
||||
this.sendNextChildMessage(childIndex);
|
||||
}
|
||||
@ -785,40 +779,38 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
if (childCall.state === 'COMPLETED') {
|
||||
return;
|
||||
}
|
||||
if (this.getBufferEntry(childCall.nextMessageToSend)) {
|
||||
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
|
||||
const messageIndex = childCall.nextMessageToSend;
|
||||
if (this.getBufferEntry(messageIndex)) {
|
||||
const bufferEntry = this.getBufferEntry(messageIndex);
|
||||
switch (bufferEntry.entryType) {
|
||||
case 'MESSAGE':
|
||||
childCall.highestSentMessageIndex = childCall.nextMessageToSend;
|
||||
childCall.call.sendMessageWithContext(
|
||||
{
|
||||
callback: error => {
|
||||
// Ignore error
|
||||
this.handleChildWriteCompleted(childIndex);
|
||||
this.handleChildWriteCompleted(childIndex, messageIndex);
|
||||
},
|
||||
},
|
||||
bufferEntry.message!.message
|
||||
);
|
||||
childCall.highestSentMessageIndex = messageIndex;
|
||||
// Optimization: if the next entry is HALF_CLOSE, send it immediately
|
||||
// without waiting for the message callback. This is safe because the message
|
||||
// has already been passed to the underlying transport.
|
||||
const nextEntry = this.getBufferEntry(childCall.nextMessageToSend + 1);
|
||||
if (nextEntry.entryType === 'HALF_CLOSE' && !childCall.halfCloseSent) {
|
||||
const nextEntry = this.getBufferEntry(messageIndex + 1);
|
||||
if (nextEntry.entryType === 'HALF_CLOSE') {
|
||||
this.trace(
|
||||
'Sending halfClose immediately after message to child [' +
|
||||
childCall.call.getCallNumber() +
|
||||
'] - optimizing for unary/final message'
|
||||
);
|
||||
childCall.halfCloseSent = true;
|
||||
childCall.nextMessageToSend += 1;
|
||||
childCall.call.halfClose();
|
||||
}
|
||||
break;
|
||||
case 'HALF_CLOSE':
|
||||
if (!childCall.halfCloseSent) {
|
||||
childCall.nextMessageToSend += 1;
|
||||
childCall.halfCloseSent = true;
|
||||
childCall.call.halfClose();
|
||||
}
|
||||
childCall.nextMessageToSend += 1;
|
||||
childCall.call.halfClose();
|
||||
break;
|
||||
case 'FREED':
|
||||
// Should not be possible
|
||||
@ -847,16 +839,16 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
call.state === 'ACTIVE' &&
|
||||
call.nextMessageToSend === messageIndex
|
||||
) {
|
||||
call.highestSentMessageIndex = messageIndex;
|
||||
call.call.sendMessageWithContext(
|
||||
{
|
||||
callback: error => {
|
||||
// Ignore error
|
||||
this.handleChildWriteCompleted(callIndex);
|
||||
this.handleChildWriteCompleted(callIndex, messageIndex);
|
||||
},
|
||||
},
|
||||
message
|
||||
);
|
||||
call.highestSentMessageIndex = messageIndex;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -873,7 +865,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
{
|
||||
callback: error => {
|
||||
// Ignore error
|
||||
this.handleChildWriteCompleted(this.committedCallIndex!);
|
||||
this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex);
|
||||
},
|
||||
},
|
||||
message
|
||||
@ -898,19 +890,18 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
|
||||
allocated: false,
|
||||
});
|
||||
for (const call of this.underlyingCalls) {
|
||||
if (call?.state === 'ACTIVE' && !call.halfCloseSent) {
|
||||
if (call?.state === 'ACTIVE') {
|
||||
// Send halfClose immediately if all messages have been sent to this call
|
||||
// We check highestSentMessageIndex >= halfCloseIndex - 1 because:
|
||||
// - If halfCloseIndex is 0, there are no messages, so send immediately
|
||||
// - If halfCloseIndex is N, the last message is at index N-1
|
||||
// - If highestSentMessageIndex >= N-1, all messages have been sent
|
||||
if (halfCloseIndex === 0 || call.highestSentMessageIndex >= halfCloseIndex - 1) {
|
||||
if (call.highestSentMessageIndex >= halfCloseIndex - 1) {
|
||||
this.trace(
|
||||
'Sending halfClose immediately to child [' +
|
||||
call.call.getCallNumber() +
|
||||
'] - all messages already sent'
|
||||
);
|
||||
call.halfCloseSent = true;
|
||||
call.nextMessageToSend += 1;
|
||||
call.call.halfClose();
|
||||
}
|
||||
// Otherwise, halfClose will be sent by sendNextChildMessage when messages complete
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user