mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
Merge pull request #2008 from grpc/@grpc/grpc-js@1.4.x
Upmerge more changes from @grpc/grpc js@1.4.x
This commit is contained in:
commit
0dbcbcc320
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@grpc/grpc-js",
|
||||
"version": "1.4.4",
|
||||
"version": "1.4.6",
|
||||
"description": "gRPC Library for Node - pure JS implementation",
|
||||
"homepage": "https://grpc.io/",
|
||||
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
||||
|
||||
@ -149,6 +149,9 @@ export function isInterceptingListener(
|
||||
}
|
||||
|
||||
export class InterceptingListenerImpl implements InterceptingListener {
|
||||
private processingMetadata = false;
|
||||
private hasPendingMessage = false;
|
||||
private pendingMessage: any;
|
||||
private processingMessage = false;
|
||||
private pendingStatus: StatusObject | null = null;
|
||||
constructor(
|
||||
@ -156,9 +159,27 @@ export class InterceptingListenerImpl implements InterceptingListener {
|
||||
private nextListener: InterceptingListener
|
||||
) {}
|
||||
|
||||
private processPendingMessage() {
|
||||
if (this.hasPendingMessage) {
|
||||
this.nextListener.onReceiveMessage(this.pendingMessage);
|
||||
this.pendingMessage = null;
|
||||
this.hasPendingMessage = false;
|
||||
}
|
||||
}
|
||||
|
||||
private processPendingStatus() {
|
||||
if (this.pendingStatus) {
|
||||
this.nextListener.onReceiveStatus(this.pendingStatus);
|
||||
}
|
||||
}
|
||||
|
||||
onReceiveMetadata(metadata: Metadata): void {
|
||||
this.processingMetadata = true;
|
||||
this.listener.onReceiveMetadata(metadata, (metadata) => {
|
||||
this.processingMetadata = false;
|
||||
this.nextListener.onReceiveMetadata(metadata);
|
||||
this.processPendingMessage();
|
||||
this.processPendingStatus();
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener {
|
||||
this.processingMessage = true;
|
||||
this.listener.onReceiveMessage(message, (msg) => {
|
||||
this.processingMessage = false;
|
||||
this.nextListener.onReceiveMessage(msg);
|
||||
if (this.pendingStatus) {
|
||||
this.nextListener.onReceiveStatus(this.pendingStatus);
|
||||
if (this.processingMetadata) {
|
||||
this.pendingMessage = msg;
|
||||
this.hasPendingMessage = true;
|
||||
} else {
|
||||
this.nextListener.onReceiveMessage(msg);
|
||||
this.processPendingStatus();
|
||||
}
|
||||
});
|
||||
}
|
||||
onReceiveStatus(status: StatusObject): void {
|
||||
this.listener.onReceiveStatus(status, (processedStatus) => {
|
||||
if (this.processingMessage) {
|
||||
if (this.processingMetadata || this.processingMessage) {
|
||||
this.pendingStatus = processedStatus;
|
||||
} else {
|
||||
this.nextListener.onReceiveStatus(processedStatus);
|
||||
@ -283,7 +307,7 @@ export class Http2CallStream implements Call {
|
||||
|
||||
private outputStatus() {
|
||||
/* Precondition: this.finalStatus !== null */
|
||||
if (!this.statusOutput) {
|
||||
if (this.listener && !this.statusOutput) {
|
||||
this.statusOutput = true;
|
||||
const filteredStatus = this.filterStack.receiveTrailers(
|
||||
this.finalStatus!
|
||||
@ -692,6 +716,7 @@ export class Http2CallStream implements Call {
|
||||
this.trace('Sending metadata');
|
||||
this.listener = listener;
|
||||
this.channel._startCallStream(this, metadata);
|
||||
this.maybeOutputStatus();
|
||||
}
|
||||
|
||||
private destroyHttp2Stream() {
|
||||
|
||||
@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
*/
|
||||
private requester: FullRequester;
|
||||
/**
|
||||
* Indicates that a message has been passed to the listener's onReceiveMessage
|
||||
* method it has not been passed to the corresponding next callback
|
||||
* Indicates that metadata has been passed to the requester's start
|
||||
* method but it has not been passed to the corresponding next callback
|
||||
*/
|
||||
private processingMetadata = false;
|
||||
/**
|
||||
* Message context for a pending message that is waiting for
|
||||
*/
|
||||
private pendingMessageContext: MessageContext | null = null;
|
||||
private pendingMessage: any;
|
||||
/**
|
||||
* Indicates that a message has been passed to the requester's sendMessage
|
||||
* method but it has not been passed to the corresponding next callback
|
||||
*/
|
||||
private processingMessage = false;
|
||||
/**
|
||||
@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
getPeer() {
|
||||
return this.nextCall.getPeer();
|
||||
}
|
||||
|
||||
private processPendingMessage() {
|
||||
if (this.pendingMessageContext) {
|
||||
this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
|
||||
this.pendingMessageContext = null;
|
||||
this.pendingMessage = null;
|
||||
}
|
||||
}
|
||||
|
||||
private processPendingHalfClose() {
|
||||
if (this.pendingHalfClose) {
|
||||
this.nextCall.halfClose();
|
||||
}
|
||||
}
|
||||
|
||||
start(
|
||||
metadata: Metadata,
|
||||
interceptingListener?: Partial<InterceptingListener>
|
||||
@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
|
||||
((status) => {}),
|
||||
};
|
||||
this.processingMetadata = true;
|
||||
this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
|
||||
this.processingMetadata = false;
|
||||
let finalInterceptingListener: InterceptingListener;
|
||||
if (isInterceptingListener(listener)) {
|
||||
finalInterceptingListener = listener;
|
||||
@ -276,6 +303,8 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
);
|
||||
}
|
||||
this.nextCall.start(md, finalInterceptingListener);
|
||||
this.processPendingMessage();
|
||||
this.processPendingHalfClose();
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
@ -283,9 +312,12 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
this.processingMessage = true;
|
||||
this.requester.sendMessage(message, (finalMessage) => {
|
||||
this.processingMessage = false;
|
||||
this.nextCall.sendMessageWithContext(context, finalMessage);
|
||||
if (this.pendingHalfClose) {
|
||||
this.nextCall.halfClose();
|
||||
if (this.processingMetadata) {
|
||||
this.pendingMessageContext = context;
|
||||
this.pendingMessage = message;
|
||||
} else {
|
||||
this.nextCall.sendMessageWithContext(context, finalMessage);
|
||||
this.processPendingHalfClose();
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface {
|
||||
}
|
||||
halfClose(): void {
|
||||
this.requester.halfClose(() => {
|
||||
if (this.processingMessage) {
|
||||
if (this.processingMetadata || this.processingMessage) {
|
||||
this.pendingHalfClose = true;
|
||||
} else {
|
||||
this.nextCall.halfClose();
|
||||
|
||||
@ -36,9 +36,9 @@ export interface IntermediateObjectWritable<T> extends Writable {
|
||||
write(chunk: any & T, cb?: WriteCallback): boolean;
|
||||
write(chunk: any & T, encoding?: any, cb?: WriteCallback): boolean;
|
||||
setDefaultEncoding(encoding: string): this;
|
||||
end(): void;
|
||||
end(chunk: any & T, cb?: Function): void;
|
||||
end(chunk: any & T, encoding?: any, cb?: Function): void;
|
||||
end(): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
end(chunk: any & T, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
end(chunk: any & T, encoding?: any, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
}
|
||||
|
||||
export interface ObjectWritable<T> extends IntermediateObjectWritable<T> {
|
||||
@ -46,20 +46,7 @@ export interface ObjectWritable<T> extends IntermediateObjectWritable<T> {
|
||||
write(chunk: T, cb?: Function): boolean;
|
||||
write(chunk: T, encoding?: any, cb?: Function): boolean;
|
||||
setDefaultEncoding(encoding: string): this;
|
||||
end(): void;
|
||||
end(chunk: T, cb?: Function): void;
|
||||
end(chunk: T, encoding?: any, cb?: Function): void;
|
||||
end(): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
end(chunk: T, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
end(chunk: T, encoding?: any, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
|
||||
}
|
||||
|
||||
export type ObjectDuplex<T, U> = {
|
||||
read(size?: number): U;
|
||||
|
||||
_write(chunk: T, encoding: string, callback: Function): void;
|
||||
write(chunk: T, cb?: Function): boolean;
|
||||
write(chunk: T, encoding?: any, cb?: Function): boolean;
|
||||
end(): void;
|
||||
end(chunk: T, cb?: Function): void;
|
||||
end(chunk: T, encoding?: any, cb?: Function): void;
|
||||
} & Duplex &
|
||||
ObjectWritable<T> &
|
||||
ObjectReadable<U>;
|
||||
|
||||
@ -26,7 +26,7 @@ import { ConnectivityState } from './connectivity-state';
|
||||
import { ConfigSelector, createResolver, Resolver } from './resolver';
|
||||
import { ServiceError } from './call';
|
||||
import { Picker, UnavailablePicker, QueuePicker } from './picker';
|
||||
import { BackoffTimeout } from './backoff-timeout';
|
||||
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
|
||||
import { Status } from './constants';
|
||||
import { StatusObject } from './call-stream';
|
||||
import { Metadata } from './metadata';
|
||||
@ -248,7 +248,10 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
},
|
||||
channelOptions
|
||||
);
|
||||
|
||||
const backoffOptions: BackoffOptions = {
|
||||
initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'],
|
||||
maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'],
|
||||
};
|
||||
this.backoffTimeout = new BackoffTimeout(() => {
|
||||
if (this.continueResolving) {
|
||||
this.updateResolution();
|
||||
@ -256,7 +259,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
} else {
|
||||
this.updateState(this.latestChildState, this.latestChildPicker);
|
||||
}
|
||||
});
|
||||
}, backoffOptions);
|
||||
this.backoffTimeout.unref();
|
||||
}
|
||||
|
||||
|
||||
@ -238,7 +238,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
|
||||
this.trailingMetadata = metadata;
|
||||
}
|
||||
|
||||
super.end();
|
||||
return super.end();
|
||||
}
|
||||
}
|
||||
|
||||
@ -285,7 +285,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType>
|
||||
this.trailingMetadata = metadata;
|
||||
}
|
||||
|
||||
super.end();
|
||||
return super.end();
|
||||
}
|
||||
}
|
||||
|
||||
@ -295,7 +295,6 @@ ServerDuplexStreamImpl.prototype._write =
|
||||
ServerWritableStreamImpl.prototype._write;
|
||||
ServerDuplexStreamImpl.prototype._final =
|
||||
ServerWritableStreamImpl.prototype._final;
|
||||
ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end;
|
||||
|
||||
// Unary response callback signature.
|
||||
export type sendUnaryData<ResponseType> = (
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user