diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 854000a3..510eac61 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.4", + "version": "1.4.6", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index f705f6ca..f2e045ae 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -149,6 +149,9 @@ export function isInterceptingListener( } export class InterceptingListenerImpl implements InterceptingListener { + private processingMetadata = false; + private hasPendingMessage = false; + private pendingMessage: any; private processingMessage = false; private pendingStatus: StatusObject | null = null; constructor( @@ -156,9 +159,27 @@ export class InterceptingListenerImpl implements InterceptingListener { private nextListener: InterceptingListener ) {} + private processPendingMessage() { + if (this.hasPendingMessage) { + this.nextListener.onReceiveMessage(this.pendingMessage); + this.pendingMessage = null; + this.hasPendingMessage = false; + } + } + + private processPendingStatus() { + if (this.pendingStatus) { + this.nextListener.onReceiveStatus(this.pendingStatus); + } + } + onReceiveMetadata(metadata: Metadata): void { + this.processingMetadata = true; this.listener.onReceiveMetadata(metadata, (metadata) => { + this.processingMetadata = false; this.nextListener.onReceiveMetadata(metadata); + this.processPendingMessage(); + this.processPendingStatus(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener { this.processingMessage = true; this.listener.onReceiveMessage(message, (msg) => { this.processingMessage = false; - this.nextListener.onReceiveMessage(msg); - if (this.pendingStatus) { - this.nextListener.onReceiveStatus(this.pendingStatus); + if (this.processingMetadata) { + this.pendingMessage = msg; + this.hasPendingMessage = true; + } else { + this.nextListener.onReceiveMessage(msg); + this.processPendingStatus(); } }); } onReceiveStatus(status: StatusObject): void { this.listener.onReceiveStatus(status, (processedStatus) => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingStatus = processedStatus; } else { this.nextListener.onReceiveStatus(processedStatus); @@ -283,7 +307,7 @@ export class Http2CallStream implements Call { private outputStatus() { /* Precondition: this.finalStatus !== null */ - if (!this.statusOutput) { + if (this.listener && !this.statusOutput) { this.statusOutput = true; const filteredStatus = this.filterStack.receiveTrailers( this.finalStatus! @@ -692,6 +716,7 @@ export class Http2CallStream implements Call { this.trace('Sending metadata'); this.listener = listener; this.channel._startCallStream(this, metadata); + this.maybeOutputStatus(); } private destroyHttp2Stream() { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index 5dfbeba1..ddb296ff 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface { */ private requester: FullRequester; /** - * Indicates that a message has been passed to the listener's onReceiveMessage - * method it has not been passed to the corresponding next callback + * Indicates that metadata has been passed to the requester's start + * method but it has not been passed to the corresponding next callback + */ + private processingMetadata = false; + /** + * Message context for a pending message that is waiting for + */ + private pendingMessageContext: MessageContext | null = null; + private pendingMessage: any; + /** + * Indicates that a message has been passed to the requester's sendMessage + * method but it has not been passed to the corresponding next callback */ private processingMessage = false; /** @@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface { getPeer() { return this.nextCall.getPeer(); } + + private processPendingMessage() { + if (this.pendingMessageContext) { + this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage); + this.pendingMessageContext = null; + this.pendingMessage = null; + } + } + + private processPendingHalfClose() { + if (this.pendingHalfClose) { + this.nextCall.halfClose(); + } + } + start( metadata: Metadata, interceptingListener?: Partial @@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface { interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? ((status) => {}), }; + this.processingMetadata = true; this.requester.start(metadata, fullInterceptingListener, (md, listener) => { + this.processingMetadata = false; let finalInterceptingListener: InterceptingListener; if (isInterceptingListener(listener)) { finalInterceptingListener = listener; @@ -276,6 +303,8 @@ export class InterceptingCall implements InterceptingCallInterface { ); } this.nextCall.start(md, finalInterceptingListener); + this.processPendingMessage(); + this.processPendingHalfClose(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -283,9 +312,12 @@ export class InterceptingCall implements InterceptingCallInterface { this.processingMessage = true; this.requester.sendMessage(message, (finalMessage) => { this.processingMessage = false; - this.nextCall.sendMessageWithContext(context, finalMessage); - if (this.pendingHalfClose) { - this.nextCall.halfClose(); + if (this.processingMetadata) { + this.pendingMessageContext = context; + this.pendingMessage = message; + } else { + this.nextCall.sendMessageWithContext(context, finalMessage); + this.processPendingHalfClose(); } }); } @@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface { } halfClose(): void { this.requester.halfClose(() => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingHalfClose = true; } else { this.nextCall.halfClose(); diff --git a/packages/grpc-js/src/object-stream.ts b/packages/grpc-js/src/object-stream.ts index b17058a7..22ab8a41 100644 --- a/packages/grpc-js/src/object-stream.ts +++ b/packages/grpc-js/src/object-stream.ts @@ -36,9 +36,9 @@ export interface IntermediateObjectWritable extends Writable { write(chunk: any & T, cb?: WriteCallback): boolean; write(chunk: any & T, encoding?: any, cb?: WriteCallback): boolean; setDefaultEncoding(encoding: string): this; - end(): void; - end(chunk: any & T, cb?: Function): void; - end(chunk: any & T, encoding?: any, cb?: Function): void; + end(): ReturnType extends Writable ? this : void; + end(chunk: any & T, cb?: Function): ReturnType extends Writable ? this : void; + end(chunk: any & T, encoding?: any, cb?: Function): ReturnType extends Writable ? this : void; } export interface ObjectWritable extends IntermediateObjectWritable { @@ -46,20 +46,7 @@ export interface ObjectWritable extends IntermediateObjectWritable { write(chunk: T, cb?: Function): boolean; write(chunk: T, encoding?: any, cb?: Function): boolean; setDefaultEncoding(encoding: string): this; - end(): void; - end(chunk: T, cb?: Function): void; - end(chunk: T, encoding?: any, cb?: Function): void; + end(): ReturnType extends Writable ? this : void; + end(chunk: T, cb?: Function): ReturnType extends Writable ? this : void; + end(chunk: T, encoding?: any, cb?: Function): ReturnType extends Writable ? this : void; } - -export type ObjectDuplex = { - read(size?: number): U; - - _write(chunk: T, encoding: string, callback: Function): void; - write(chunk: T, cb?: Function): boolean; - write(chunk: T, encoding?: any, cb?: Function): boolean; - end(): void; - end(chunk: T, cb?: Function): void; - end(chunk: T, encoding?: any, cb?: Function): void; -} & Duplex & - ObjectWritable & - ObjectReadable; diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 235748f7..5bc86128 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -26,7 +26,7 @@ import { ConnectivityState } from './connectivity-state'; import { ConfigSelector, createResolver, Resolver } from './resolver'; import { ServiceError } from './call'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; -import { BackoffTimeout } from './backoff-timeout'; +import { BackoffOptions, BackoffTimeout } from './backoff-timeout'; import { Status } from './constants'; import { StatusObject } from './call-stream'; import { Metadata } from './metadata'; @@ -248,7 +248,10 @@ export class ResolvingLoadBalancer implements LoadBalancer { }, channelOptions ); - + const backoffOptions: BackoffOptions = { + initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'], + maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'], + }; this.backoffTimeout = new BackoffTimeout(() => { if (this.continueResolving) { this.updateResolution(); @@ -256,7 +259,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { } else { this.updateState(this.latestChildState, this.latestChildPicker); } - }); + }, backoffOptions); this.backoffTimeout.unref(); } diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 78cab5a9..315c9d9a 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -238,7 +238,7 @@ export class ServerWritableStreamImpl this.trailingMetadata = metadata; } - super.end(); + return super.end(); } } @@ -285,7 +285,7 @@ export class ServerDuplexStreamImpl this.trailingMetadata = metadata; } - super.end(); + return super.end(); } } @@ -295,7 +295,6 @@ ServerDuplexStreamImpl.prototype._write = ServerWritableStreamImpl.prototype._write; ServerDuplexStreamImpl.prototype._final = ServerWritableStreamImpl.prototype._final; -ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end; // Unary response callback signature. export type sendUnaryData = (