From 7cccc392181050f1e7ed7334f148d35dbe6b7b5f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 9 Dec 2021 16:14:52 -0500 Subject: [PATCH 1/7] grpc-js: Preserve order of metadata and messages with async interceptors --- packages/grpc-js/src/call-stream.ts | 35 +++++++++++++--- packages/grpc-js/src/client-interceptors.ts | 44 ++++++++++++++++++--- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 70a9ac6e..915d812f 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -149,6 +149,9 @@ export function isInterceptingListener( } export class InterceptingListenerImpl implements InterceptingListener { + private processingMetadata = false; + private hasPendingMessage = false; + private pendingMessage: any; private processingMessage = false; private pendingStatus: StatusObject | null = null; constructor( @@ -156,9 +159,27 @@ export class InterceptingListenerImpl implements InterceptingListener { private nextListener: InterceptingListener ) {} + private processPendingMessage() { + if (this.hasPendingMessage) { + this.nextListener.onReceiveMessage(this.pendingMessage); + this.pendingMessage = null; + this.hasPendingMessage = false; + } + } + + private processPendingStatus() { + if (this.pendingStatus) { + this.nextListener.onReceiveStatus(this.pendingStatus); + } + } + onReceiveMetadata(metadata: Metadata): void { + this.processingMetadata = true; this.listener.onReceiveMetadata(metadata, (metadata) => { + this.processingMetadata = false; this.nextListener.onReceiveMetadata(metadata); + this.processPendingMessage(); + this.processPendingStatus(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener { this.processingMessage = true; this.listener.onReceiveMessage(message, (msg) => { this.processingMessage = false; - this.nextListener.onReceiveMessage(msg); - if (this.pendingStatus) { - this.nextListener.onReceiveStatus(this.pendingStatus); + if (this.processingMetadata) { + this.pendingMessage = msg; + this.hasPendingMessage = true; + } else { + this.nextListener.onReceiveMessage(msg); + this.processPendingStatus(); } }); } onReceiveStatus(status: StatusObject): void { this.listener.onReceiveStatus(status, (processedStatus) => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingStatus = processedStatus; } else { this.nextListener.onReceiveStatus(processedStatus); @@ -283,7 +307,7 @@ export class Http2CallStream implements Call { private outputStatus() { /* Precondition: this.finalStatus !== null */ - if (!this.statusOutput) { + if (this.listener && !this.statusOutput) { this.statusOutput = true; const filteredStatus = this.filterStack.receiveTrailers( this.finalStatus! @@ -692,6 +716,7 @@ export class Http2CallStream implements Call { this.trace('Sending metadata'); this.listener = listener; this.channel._startCallStream(this, metadata); + this.maybeOutputStatus(); } private destroyHttp2Stream() { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index 5dfbeba1..ddb296ff 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface { */ private requester: FullRequester; /** - * Indicates that a message has been passed to the listener's onReceiveMessage - * method it has not been passed to the corresponding next callback + * Indicates that metadata has been passed to the requester's start + * method but it has not been passed to the corresponding next callback + */ + private processingMetadata = false; + /** + * Message context for a pending message that is waiting for + */ + private pendingMessageContext: MessageContext | null = null; + private pendingMessage: any; + /** + * Indicates that a message has been passed to the requester's sendMessage + * method but it has not been passed to the corresponding next callback */ private processingMessage = false; /** @@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface { getPeer() { return this.nextCall.getPeer(); } + + private processPendingMessage() { + if (this.pendingMessageContext) { + this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage); + this.pendingMessageContext = null; + this.pendingMessage = null; + } + } + + private processPendingHalfClose() { + if (this.pendingHalfClose) { + this.nextCall.halfClose(); + } + } + start( metadata: Metadata, interceptingListener?: Partial @@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface { interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? ((status) => {}), }; + this.processingMetadata = true; this.requester.start(metadata, fullInterceptingListener, (md, listener) => { + this.processingMetadata = false; let finalInterceptingListener: InterceptingListener; if (isInterceptingListener(listener)) { finalInterceptingListener = listener; @@ -276,6 +303,8 @@ export class InterceptingCall implements InterceptingCallInterface { ); } this.nextCall.start(md, finalInterceptingListener); + this.processPendingMessage(); + this.processPendingHalfClose(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -283,9 +312,12 @@ export class InterceptingCall implements InterceptingCallInterface { this.processingMessage = true; this.requester.sendMessage(message, (finalMessage) => { this.processingMessage = false; - this.nextCall.sendMessageWithContext(context, finalMessage); - if (this.pendingHalfClose) { - this.nextCall.halfClose(); + if (this.processingMetadata) { + this.pendingMessageContext = context; + this.pendingMessage = message; + } else { + this.nextCall.sendMessageWithContext(context, finalMessage); + this.processPendingHalfClose(); } }); } @@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface { } halfClose(): void { this.requester.halfClose(() => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingHalfClose = true; } else { this.nextCall.halfClose(); From 86f3ffd96ce7e656acd004caf10c1c6c98ceb84c Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 9 Dec 2021 16:31:29 -0500 Subject: [PATCH 2/7] grpc-js: Update version to 1.4.5 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index e14ffa0e..02400ca9 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.4", + "version": "1.4.5", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From b12330abfd8ff9207a332e8a47e05703c26697f5 Mon Sep 17 00:00:00 2001 From: Cosmin-Catalin Crisan Date: Fri, 10 Dec 2021 20:20:11 +0200 Subject: [PATCH 3/7] grpc-js: Send backoffOptions to BackoffTimeout --- packages/grpc-js/src/resolving-load-balancer.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 235748f7..5bc86128 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -26,7 +26,7 @@ import { ConnectivityState } from './connectivity-state'; import { ConfigSelector, createResolver, Resolver } from './resolver'; import { ServiceError } from './call'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; -import { BackoffTimeout } from './backoff-timeout'; +import { BackoffOptions, BackoffTimeout } from './backoff-timeout'; import { Status } from './constants'; import { StatusObject } from './call-stream'; import { Metadata } from './metadata'; @@ -248,7 +248,10 @@ export class ResolvingLoadBalancer implements LoadBalancer { }, channelOptions ); - + const backoffOptions: BackoffOptions = { + initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'], + maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'], + }; this.backoffTimeout = new BackoffTimeout(() => { if (this.continueResolving) { this.updateResolution(); @@ -256,7 +259,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { } else { this.updateState(this.latestChildState, this.latestChildPicker); } - }); + }, backoffOptions); this.backoffTimeout.unref(); } From a000d67a8869585d23d8e9f0a50195c5b364a240 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 29 Dec 2021 11:58:32 -0800 Subject: [PATCH 4/7] Use xds-test-server-5 as interop server image --- packages/grpc-js-xds/scripts/xds.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index a06cde84..b584c21c 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -54,7 +54,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh python3 grpc/tools/run_tests/run_xds_tests.py \ --test_case="all,timeout,circuit_breaking,fault_injection" \ --project_id=grpc-testing \ - --source_image=projects/grpc-testing/global/images/xds-test-server-4 \ + --source_image=projects/grpc-testing/global/images/xds-test-server-5 \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ From 311d22e03e5818260feedaaf233f86cc55ed581b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 4 Jan 2022 12:41:41 -0800 Subject: [PATCH 5/7] grpc-js: Fix compatibility with @types/node 17.0.6 --- packages/grpc-js/src/object-stream.ts | 25 ++++++------------------- packages/grpc-js/src/server-call.ts | 5 ++--- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/packages/grpc-js/src/object-stream.ts b/packages/grpc-js/src/object-stream.ts index b17058a7..04282092 100644 --- a/packages/grpc-js/src/object-stream.ts +++ b/packages/grpc-js/src/object-stream.ts @@ -36,9 +36,9 @@ export interface IntermediateObjectWritable extends Writable { write(chunk: any & T, cb?: WriteCallback): boolean; write(chunk: any & T, encoding?: any, cb?: WriteCallback): boolean; setDefaultEncoding(encoding: string): this; - end(): void; - end(chunk: any & T, cb?: Function): void; - end(chunk: any & T, encoding?: any, cb?: Function): void; + end(): this; + end(chunk: any & T, cb?: Function): this; + end(chunk: any & T, encoding?: any, cb?: Function): this; } export interface ObjectWritable extends IntermediateObjectWritable { @@ -46,20 +46,7 @@ export interface ObjectWritable extends IntermediateObjectWritable { write(chunk: T, cb?: Function): boolean; write(chunk: T, encoding?: any, cb?: Function): boolean; setDefaultEncoding(encoding: string): this; - end(): void; - end(chunk: T, cb?: Function): void; - end(chunk: T, encoding?: any, cb?: Function): void; + end(): this; + end(chunk: T, cb?: Function): this; + end(chunk: T, encoding?: any, cb?: Function): this; } - -export type ObjectDuplex = { - read(size?: number): U; - - _write(chunk: T, encoding: string, callback: Function): void; - write(chunk: T, cb?: Function): boolean; - write(chunk: T, encoding?: any, cb?: Function): boolean; - end(): void; - end(chunk: T, cb?: Function): void; - end(chunk: T, encoding?: any, cb?: Function): void; -} & Duplex & - ObjectWritable & - ObjectReadable; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index be4429b0..a79e401c 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -236,7 +236,7 @@ export class ServerWritableStreamImpl this.trailingMetadata = metadata; } - super.end(); + return super.end(); } } @@ -282,7 +282,7 @@ export class ServerDuplexStreamImpl this.trailingMetadata = metadata; } - super.end(); + return super.end(); } } @@ -292,7 +292,6 @@ ServerDuplexStreamImpl.prototype._write = ServerWritableStreamImpl.prototype._write; ServerDuplexStreamImpl.prototype._final = ServerWritableStreamImpl.prototype._final; -ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end; // Unary response callback signature. export type sendUnaryData = ( From e2dfb8fbcf0c7f0282e7becfaced75fdef20bb8f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 4 Jan 2022 12:42:05 -0800 Subject: [PATCH 6/7] grpc-js: Increase version to 1.4.6 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 02400ca9..e3c98344 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.5", + "version": "1.4.6", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From fba2b9498f7ebf59cc1303dc39fcaf14b5cd3471 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 4 Jan 2022 14:00:25 -0800 Subject: [PATCH 7/7] Fix end type again for older @types/node versions --- packages/grpc-js/src/object-stream.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/grpc-js/src/object-stream.ts b/packages/grpc-js/src/object-stream.ts index 04282092..22ab8a41 100644 --- a/packages/grpc-js/src/object-stream.ts +++ b/packages/grpc-js/src/object-stream.ts @@ -36,9 +36,9 @@ export interface IntermediateObjectWritable extends Writable { write(chunk: any & T, cb?: WriteCallback): boolean; write(chunk: any & T, encoding?: any, cb?: WriteCallback): boolean; setDefaultEncoding(encoding: string): this; - end(): this; - end(chunk: any & T, cb?: Function): this; - end(chunk: any & T, encoding?: any, cb?: Function): this; + end(): ReturnType extends Writable ? this : void; + end(chunk: any & T, cb?: Function): ReturnType extends Writable ? this : void; + end(chunk: any & T, encoding?: any, cb?: Function): ReturnType extends Writable ? this : void; } export interface ObjectWritable extends IntermediateObjectWritable { @@ -46,7 +46,7 @@ export interface ObjectWritable extends IntermediateObjectWritable { write(chunk: T, cb?: Function): boolean; write(chunk: T, encoding?: any, cb?: Function): boolean; setDefaultEncoding(encoding: string): this; - end(): this; - end(chunk: T, cb?: Function): this; - end(chunk: T, encoding?: any, cb?: Function): this; + end(): ReturnType extends Writable ? this : void; + end(chunk: T, cb?: Function): ReturnType extends Writable ? this : void; + end(chunk: T, encoding?: any, cb?: Function): ReturnType extends Writable ? this : void; }