mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
821 lines
30 KiB
TypeScript
821 lines
30 KiB
TypeScript
/*
|
|
* Copyright 2019 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import {
|
|
Deadline,
|
|
Call,
|
|
Http2CallStream,
|
|
CallStreamOptions,
|
|
StatusObject,
|
|
} from './call-stream';
|
|
import { ChannelCredentials } from './channel-credentials';
|
|
import { ChannelOptions } from './channel-options';
|
|
import { ResolvingLoadBalancer } from './resolving-load-balancer';
|
|
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
|
|
import { ChannelControlHelper } from './load-balancer';
|
|
import { UnavailablePicker, Picker, PickResultType } from './picker';
|
|
import { Metadata } from './metadata';
|
|
import { Status, LogVerbosity, Propagate } from './constants';
|
|
import { FilterStackFactory } from './filter-stack';
|
|
import { CallCredentialsFilterFactory } from './call-credentials-filter';
|
|
import { DeadlineFilterFactory } from './deadline-filter';
|
|
import { CompressionFilterFactory } from './compression-filter';
|
|
import {
|
|
CallConfig,
|
|
ConfigSelector,
|
|
getDefaultAuthority,
|
|
mapUriDefaultScheme,
|
|
} from './resolver';
|
|
import { trace, log } from './logging';
|
|
import { SubchannelAddress } from './subchannel-address';
|
|
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
|
|
import { mapProxyName } from './http_proxy';
|
|
import { GrpcUri, parseUri, uriToString } from './uri-parser';
|
|
import { ServerSurfaceCall } from './server-call';
|
|
import { Filter } from './filter';
|
|
|
|
import { ConnectivityState } from './connectivity-state';
|
|
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
|
|
import { Subchannel } from './subchannel';
|
|
|
|
/**
|
|
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
|
|
*/
|
|
const MAX_TIMEOUT_TIME = 2147483647;
|
|
|
|
let nextCallNumber = 0;
|
|
|
|
function getNewCallNumber(): number {
|
|
const callNumber = nextCallNumber;
|
|
nextCallNumber += 1;
|
|
if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
|
|
nextCallNumber = 0;
|
|
}
|
|
return callNumber;
|
|
}
|
|
|
|
const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [
|
|
Status.OK,
|
|
Status.INVALID_ARGUMENT,
|
|
Status.NOT_FOUND,
|
|
Status.ALREADY_EXISTS,
|
|
Status.FAILED_PRECONDITION,
|
|
Status.ABORTED,
|
|
Status.OUT_OF_RANGE,
|
|
Status.DATA_LOSS
|
|
]
|
|
|
|
function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} {
|
|
if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) {
|
|
return {
|
|
code: Status.INTERNAL,
|
|
details: `Invalid status from control plane: ${code} ${Status[code]} ${details}`
|
|
}
|
|
} else {
|
|
return {code, details};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* An interface that represents a communication channel to a server specified
|
|
* by a given address.
|
|
*/
|
|
export interface Channel {
|
|
/**
|
|
* Close the channel. This has the same functionality as the existing
|
|
* grpc.Client.prototype.close
|
|
*/
|
|
close(): void;
|
|
/**
|
|
* Return the target that this channel connects to
|
|
*/
|
|
getTarget(): string;
|
|
/**
|
|
* Get the channel's current connectivity state. This method is here mainly
|
|
* because it is in the existing internal Channel class, and there isn't
|
|
* another good place to put it.
|
|
* @param tryToConnect If true, the channel will start connecting if it is
|
|
* idle. Otherwise, idle channels will only start connecting when a
|
|
* call starts.
|
|
*/
|
|
getConnectivityState(tryToConnect: boolean): ConnectivityState;
|
|
/**
|
|
* Watch for connectivity state changes. This is also here mainly because
|
|
* it is in the existing external Channel class.
|
|
* @param currentState The state to watch for transitions from. This should
|
|
* always be populated by calling getConnectivityState immediately
|
|
* before.
|
|
* @param deadline A deadline for waiting for a state change
|
|
* @param callback Called with no error when a state change, or with an
|
|
* error if the deadline passes without a state change.
|
|
*/
|
|
watchConnectivityState(
|
|
currentState: ConnectivityState,
|
|
deadline: Date | number,
|
|
callback: (error?: Error) => void
|
|
): void;
|
|
/**
|
|
* Get the channelz reference object for this channel. A request to the
|
|
* channelz service for the id in this object will provide information
|
|
* about this channel.
|
|
*/
|
|
getChannelzRef(): ChannelRef;
|
|
/**
|
|
* Create a call object. Call is an opaque type that is used by the Client
|
|
* class. This function is called by the gRPC library when starting a
|
|
* request. Implementers should return an instance of Call that is returned
|
|
* from calling createCall on an instance of the provided Channel class.
|
|
* @param method The full method string to request.
|
|
* @param deadline The call deadline
|
|
* @param host A host string override for making the request
|
|
* @param parentCall A server call to propagate some information from
|
|
* @param propagateFlags A bitwise combination of elements of grpc.propagate
|
|
* that indicates what information to propagate from parentCall.
|
|
*/
|
|
createCall(
|
|
method: string,
|
|
deadline: Deadline,
|
|
host: string | null | undefined,
|
|
parentCall: ServerSurfaceCall | null,
|
|
propagateFlags: number | null | undefined
|
|
): Call;
|
|
}
|
|
|
|
interface ConnectivityStateWatcher {
|
|
currentState: ConnectivityState;
|
|
timer: NodeJS.Timeout | null;
|
|
callback: (error?: Error) => void;
|
|
}
|
|
|
|
export class ChannelImplementation implements Channel {
|
|
private resolvingLoadBalancer: ResolvingLoadBalancer;
|
|
private subchannelPool: SubchannelPool;
|
|
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
|
private currentPicker: Picker = new UnavailablePicker();
|
|
/**
|
|
* Calls queued up to get a call config. Should only be populated before the
|
|
* first time the resolver returns a result, which includes the ConfigSelector.
|
|
*/
|
|
private configSelectionQueue: Array<{
|
|
callStream: Http2CallStream;
|
|
callMetadata: Metadata;
|
|
}> = [];
|
|
private pickQueue: Array<{
|
|
callStream: Http2CallStream;
|
|
callMetadata: Metadata;
|
|
callConfig: CallConfig;
|
|
dynamicFilters: Filter[];
|
|
}> = [];
|
|
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
|
private defaultAuthority: string;
|
|
private filterStackFactory: FilterStackFactory;
|
|
private target: GrpcUri;
|
|
/**
|
|
* This timer does not do anything on its own. Its purpose is to hold the
|
|
* event loop open while there are any pending calls for the channel that
|
|
* have not yet been assigned to specific subchannels. In other words,
|
|
* the invariant is that callRefTimer is reffed if and only if pickQueue
|
|
* is non-empty.
|
|
*/
|
|
private callRefTimer: NodeJS.Timer;
|
|
private configSelector: ConfigSelector | null = null;
|
|
/**
|
|
* This is the error from the name resolver if it failed most recently. It
|
|
* is only used to end calls that start while there is no config selector
|
|
* and the name resolver is in backoff, so it should be nulled if
|
|
* configSelector becomes set or the channel state becomes anything other
|
|
* than TRANSIENT_FAILURE.
|
|
*/
|
|
private currentResolutionError: StatusObject | null = null;
|
|
|
|
// Channelz info
|
|
private readonly channelzEnabled: boolean = true;
|
|
private originalTarget: string;
|
|
private channelzRef: ChannelRef;
|
|
private channelzTrace: ChannelzTrace;
|
|
private callTracker = new ChannelzCallTracker();
|
|
private childrenTracker = new ChannelzChildrenTracker();
|
|
|
|
constructor(
|
|
target: string,
|
|
private readonly credentials: ChannelCredentials,
|
|
private readonly options: ChannelOptions
|
|
) {
|
|
if (typeof target !== 'string') {
|
|
throw new TypeError('Channel target must be a string');
|
|
}
|
|
if (!(credentials instanceof ChannelCredentials)) {
|
|
throw new TypeError(
|
|
'Channel credentials must be a ChannelCredentials object'
|
|
);
|
|
}
|
|
if (options) {
|
|
if (typeof options !== 'object') {
|
|
throw new TypeError('Channel options must be an object');
|
|
}
|
|
}
|
|
this.originalTarget = target;
|
|
const originalTargetUri = parseUri(target);
|
|
if (originalTargetUri === null) {
|
|
throw new Error(`Could not parse target name "${target}"`);
|
|
}
|
|
/* This ensures that the target has a scheme that is registered with the
|
|
* resolver */
|
|
const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
|
|
if (defaultSchemeMapResult === null) {
|
|
throw new Error(
|
|
`Could not find a default scheme for target name "${target}"`
|
|
);
|
|
}
|
|
|
|
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
|
|
this.callRefTimer.unref?.();
|
|
|
|
if (this.options['grpc.enable_channelz'] === 0) {
|
|
this.channelzEnabled = false;
|
|
}
|
|
|
|
this.channelzTrace = new ChannelzTrace();
|
|
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
|
|
}
|
|
|
|
if (this.options['grpc.default_authority']) {
|
|
this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
|
} else {
|
|
this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
|
|
}
|
|
const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
|
|
this.target = proxyMapResult.target;
|
|
this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
|
|
|
|
/* The global boolean parameter to getSubchannelPool has the inverse meaning to what
|
|
* the grpc.use_local_subchannel_pool channel option means. */
|
|
this.subchannelPool = getSubchannelPool(
|
|
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
|
|
);
|
|
const channelControlHelper: ChannelControlHelper = {
|
|
createSubchannel: (
|
|
subchannelAddress: SubchannelAddress,
|
|
subchannelArgs: ChannelOptions
|
|
) => {
|
|
const subchannel = this.subchannelPool.getOrCreateSubchannel(
|
|
this.target,
|
|
subchannelAddress,
|
|
Object.assign({}, this.options, subchannelArgs),
|
|
this.credentials
|
|
);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
|
|
}
|
|
return subchannel;
|
|
},
|
|
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
|
this.currentPicker = picker;
|
|
const queueCopy = this.pickQueue.slice();
|
|
this.pickQueue = [];
|
|
this.callRefTimerUnref();
|
|
for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
|
|
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
}
|
|
this.updateState(connectivityState);
|
|
},
|
|
requestReresolution: () => {
|
|
// This should never be called.
|
|
throw new Error(
|
|
'Resolving load balancer should never call requestReresolution'
|
|
);
|
|
},
|
|
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
|
if (this.channelzEnabled) {
|
|
this.childrenTracker.refChild(child);
|
|
}
|
|
},
|
|
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
|
if (this.channelzEnabled) {
|
|
this.childrenTracker.unrefChild(child);
|
|
}
|
|
}
|
|
};
|
|
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
|
|
this.target,
|
|
channelControlHelper,
|
|
options,
|
|
(configSelector) => {
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
|
|
}
|
|
this.configSelector = configSelector;
|
|
this.currentResolutionError = null;
|
|
/* We process the queue asynchronously to ensure that the corresponding
|
|
* load balancer update has completed. */
|
|
process.nextTick(() => {
|
|
const localQueue = this.configSelectionQueue;
|
|
this.configSelectionQueue = [];
|
|
this.callRefTimerUnref();
|
|
for (const { callStream, callMetadata } of localQueue) {
|
|
this.tryGetConfig(callStream, callMetadata);
|
|
}
|
|
this.configSelectionQueue = [];
|
|
});
|
|
},
|
|
(status) => {
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
|
|
}
|
|
if (this.configSelectionQueue.length > 0) {
|
|
this.trace('Name resolution failed with calls queued for config selection');
|
|
}
|
|
if (this.configSelector === null) {
|
|
this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata};
|
|
}
|
|
const localQueue = this.configSelectionQueue;
|
|
this.configSelectionQueue = [];
|
|
this.callRefTimerUnref();
|
|
for (const { callStream, callMetadata } of localQueue) {
|
|
if (callMetadata.getOptions().waitForReady) {
|
|
this.callRefTimerRef();
|
|
this.configSelectionQueue.push({ callStream, callMetadata });
|
|
} else {
|
|
callStream.cancelWithStatus(status.code, status.details);
|
|
}
|
|
}
|
|
}
|
|
);
|
|
this.filterStackFactory = new FilterStackFactory([
|
|
new CallCredentialsFilterFactory(this),
|
|
new DeadlineFilterFactory(this),
|
|
new MaxMessageSizeFilterFactory(this.options),
|
|
new CompressionFilterFactory(this, this.options),
|
|
]);
|
|
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
|
|
const error = new Error();
|
|
trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
|
|
}
|
|
|
|
private getChannelzInfo(): ChannelInfo {
|
|
return {
|
|
target: this.originalTarget,
|
|
state: this.connectivityState,
|
|
trace: this.channelzTrace,
|
|
callTracker: this.callTracker,
|
|
children: this.childrenTracker.getChildLists()
|
|
};
|
|
}
|
|
|
|
private trace(text: string, verbosityOverride?: LogVerbosity) {
|
|
trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
|
|
}
|
|
|
|
private callRefTimerRef() {
|
|
// If the hasRef function does not exist, always run the code
|
|
if (!this.callRefTimer.hasRef?.()) {
|
|
this.trace(
|
|
'callRefTimer.ref | configSelectionQueue.length=' +
|
|
this.configSelectionQueue.length +
|
|
' pickQueue.length=' +
|
|
this.pickQueue.length
|
|
);
|
|
this.callRefTimer.ref?.();
|
|
}
|
|
}
|
|
|
|
private callRefTimerUnref() {
|
|
// If the hasRef function does not exist, always run the code
|
|
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
|
|
this.trace(
|
|
'callRefTimer.unref | configSelectionQueue.length=' +
|
|
this.configSelectionQueue.length +
|
|
' pickQueue.length=' +
|
|
this.pickQueue.length
|
|
);
|
|
this.callRefTimer.unref?.();
|
|
}
|
|
}
|
|
|
|
private pushPick(
|
|
callStream: Http2CallStream,
|
|
callMetadata: Metadata,
|
|
callConfig: CallConfig,
|
|
dynamicFilters: Filter[]
|
|
) {
|
|
this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
|
|
this.callRefTimerRef();
|
|
}
|
|
|
|
/**
|
|
* Check the picker output for the given call and corresponding metadata,
|
|
* and take any relevant actions. Should not be called while iterating
|
|
* over pickQueue.
|
|
* @param callStream
|
|
* @param callMetadata
|
|
*/
|
|
private tryPick(
|
|
callStream: Http2CallStream,
|
|
callMetadata: Metadata,
|
|
callConfig: CallConfig,
|
|
dynamicFilters: Filter[]
|
|
) {
|
|
const pickResult = this.currentPicker.pick({
|
|
metadata: callMetadata,
|
|
extraPickInfo: callConfig.pickInformation,
|
|
});
|
|
const subchannelString = pickResult.subchannel ?
|
|
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
|
|
'' + pickResult.subchannel;
|
|
this.trace(
|
|
'Pick result for call [' +
|
|
callStream.getCallNumber() +
|
|
']: ' +
|
|
PickResultType[pickResult.pickResultType] +
|
|
' subchannel: ' +
|
|
subchannelString +
|
|
' status: ' +
|
|
pickResult.status?.code +
|
|
' ' +
|
|
pickResult.status?.details
|
|
);
|
|
switch (pickResult.pickResultType) {
|
|
case PickResultType.COMPLETE:
|
|
if (pickResult.subchannel === null) {
|
|
callStream.cancelWithStatus(
|
|
Status.UNAVAILABLE,
|
|
'Request dropped by load balancing policy'
|
|
);
|
|
// End the call with an error
|
|
} else {
|
|
/* If the subchannel is not in the READY state, that indicates a bug
|
|
* somewhere in the load balancer or picker. So, we log an error and
|
|
* queue the pick to be tried again later. */
|
|
if (
|
|
pickResult.subchannel!.getConnectivityState() !==
|
|
ConnectivityState.READY
|
|
) {
|
|
log(
|
|
LogVerbosity.ERROR,
|
|
'Error: COMPLETE pick result subchannel ' +
|
|
subchannelString +
|
|
' has state ' +
|
|
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
|
|
);
|
|
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
break;
|
|
}
|
|
/* We need to clone the callMetadata here because the transparent
|
|
* retry code in the promise resolution handler use the same
|
|
* callMetadata object, so it needs to stay unmodified */
|
|
callStream.filterStack
|
|
.sendMetadata(Promise.resolve(callMetadata.clone()))
|
|
.then(
|
|
(finalMetadata) => {
|
|
const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
|
|
if (subchannelState === ConnectivityState.READY) {
|
|
try {
|
|
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
|
|
pickResult.subchannel?.getRealSubchannel().startCallStream(
|
|
finalMetadata,
|
|
callStream,
|
|
[...dynamicFilters, ...pickExtraFilters]
|
|
);
|
|
/* If we reach this point, the call stream has started
|
|
* successfully */
|
|
callConfig.onCommitted?.();
|
|
pickResult.onCallStarted?.();
|
|
} catch (error) {
|
|
const errorCode = (error as NodeJS.ErrnoException).code;
|
|
if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
|
|
errorCode === 'ERR_HTTP2_INVALID_SESSION'
|
|
) {
|
|
/* An error here indicates that something went wrong with
|
|
* the picked subchannel's http2 stream right before we
|
|
* tried to start the stream. We are handling a promise
|
|
* result here, so this is asynchronous with respect to the
|
|
* original tryPick call, so calling it again is not
|
|
* recursive. We call tryPick immediately instead of
|
|
* queueing this pick again because handling the queue is
|
|
* triggered by state changes, and we want to immediately
|
|
* check if the state has already changed since the
|
|
* previous tryPick call. We do this instead of cancelling
|
|
* the stream because the correct behavior may be
|
|
* re-queueing instead, based on the logic in the rest of
|
|
* tryPick */
|
|
this.trace(
|
|
'Failed to start call on picked subchannel ' +
|
|
subchannelString +
|
|
' with error ' +
|
|
(error as Error).message +
|
|
'. Retrying pick',
|
|
LogVerbosity.INFO
|
|
);
|
|
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
} else {
|
|
this.trace(
|
|
'Failed to start call on picked subchanel ' +
|
|
subchannelString +
|
|
' with error ' +
|
|
(error as Error).message +
|
|
'. Ending call',
|
|
LogVerbosity.INFO
|
|
);
|
|
callStream.cancelWithStatus(
|
|
Status.INTERNAL,
|
|
`Failed to start HTTP/2 stream with error: ${
|
|
(error as Error).message
|
|
}`
|
|
);
|
|
}
|
|
}
|
|
} else {
|
|
/* The logic for doing this here is the same as in the catch
|
|
* block above */
|
|
this.trace(
|
|
'Picked subchannel ' +
|
|
subchannelString +
|
|
' has state ' +
|
|
ConnectivityState[subchannelState] +
|
|
' after metadata filters. Retrying pick',
|
|
LogVerbosity.INFO
|
|
);
|
|
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
}
|
|
},
|
|
(error: Error & { code: number }) => {
|
|
// We assume the error code isn't 0 (Status.OK)
|
|
const {code, details} = restrictControlPlaneStatusCode(
|
|
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
|
`Getting metadata from plugin failed with error: ${error.message}`
|
|
)
|
|
callStream.cancelWithStatus(code, details);
|
|
}
|
|
);
|
|
}
|
|
break;
|
|
case PickResultType.QUEUE:
|
|
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
break;
|
|
case PickResultType.TRANSIENT_FAILURE:
|
|
if (callMetadata.getOptions().waitForReady) {
|
|
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
|
} else {
|
|
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
|
callStream.cancelWithStatus(code, details);
|
|
}
|
|
break;
|
|
case PickResultType.DROP:
|
|
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
|
callStream.cancelWithStatus(code, details);
|
|
break;
|
|
default:
|
|
throw new Error(
|
|
`Invalid state: unknown pickResultType ${pickResult.pickResultType}`
|
|
);
|
|
}
|
|
}
|
|
|
|
private removeConnectivityStateWatcher(
|
|
watcherObject: ConnectivityStateWatcher
|
|
) {
|
|
const watcherIndex = this.connectivityStateWatchers.findIndex(
|
|
(value) => value === watcherObject
|
|
);
|
|
if (watcherIndex >= 0) {
|
|
this.connectivityStateWatchers.splice(watcherIndex, 1);
|
|
}
|
|
}
|
|
|
|
private updateState(newState: ConnectivityState): void {
|
|
trace(
|
|
LogVerbosity.DEBUG,
|
|
'connectivity_state',
|
|
'(' + this.channelzRef.id + ') ' +
|
|
uriToString(this.target) +
|
|
' ' +
|
|
ConnectivityState[this.connectivityState] +
|
|
' -> ' +
|
|
ConnectivityState[newState]
|
|
);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
|
}
|
|
this.connectivityState = newState;
|
|
const watchersCopy = this.connectivityStateWatchers.slice();
|
|
for (const watcherObject of watchersCopy) {
|
|
if (newState !== watcherObject.currentState) {
|
|
if (watcherObject.timer) {
|
|
clearTimeout(watcherObject.timer);
|
|
}
|
|
this.removeConnectivityStateWatcher(watcherObject);
|
|
watcherObject.callback();
|
|
}
|
|
}
|
|
if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
|
|
this.currentResolutionError = null;
|
|
}
|
|
}
|
|
|
|
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
|
|
if (stream.getStatus() !== null) {
|
|
/* If the stream has a status, it has already finished and we don't need
|
|
* to take any more actions on it. */
|
|
return;
|
|
}
|
|
if (this.configSelector === null) {
|
|
/* This branch will only be taken at the beginning of the channel's life,
|
|
* before the resolver ever returns a result. So, the
|
|
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
|
|
* because it now has a pending request. */
|
|
this.resolvingLoadBalancer.exitIdle();
|
|
if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
|
|
stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
|
|
} else {
|
|
this.configSelectionQueue.push({
|
|
callStream: stream,
|
|
callMetadata: metadata,
|
|
});
|
|
this.callRefTimerRef();
|
|
}
|
|
} else {
|
|
const callConfig = this.configSelector(stream.getMethod(), metadata);
|
|
if (callConfig.status === Status.OK) {
|
|
if (callConfig.methodConfig.timeout) {
|
|
const deadline = new Date();
|
|
deadline.setSeconds(
|
|
deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
|
|
);
|
|
deadline.setMilliseconds(
|
|
deadline.getMilliseconds() +
|
|
callConfig.methodConfig.timeout.nanos / 1_000_000
|
|
);
|
|
stream.setConfigDeadline(deadline);
|
|
// Refreshing the filters makes the deadline filter pick up the new deadline
|
|
stream.filterStack.refresh();
|
|
}
|
|
if (callConfig.dynamicFilterFactories.length > 0) {
|
|
/* These dynamicFilters are the mechanism for implementing gRFC A39:
|
|
* https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
|
|
* We run them here instead of with the rest of the filters because
|
|
* that spec says "the xDS HTTP filters will run in between name
|
|
* resolution and load balancing".
|
|
*
|
|
* We use the filter stack here to simplify the multi-filter async
|
|
* waterfall logic, but we pass along the underlying list of filters
|
|
* to avoid having nested filter stacks when combining it with the
|
|
* original filter stack. We do not pass along the original filter
|
|
* factory list because these filters may need to persist data
|
|
* between sending headers and other operations. */
|
|
const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
|
|
const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
|
|
dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
|
|
this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
|
|
});
|
|
} else {
|
|
this.tryPick(stream, metadata, callConfig, []);
|
|
}
|
|
} else {
|
|
const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
|
|
stream.cancelWithStatus(code, details);
|
|
}
|
|
}
|
|
}
|
|
|
|
_startCallStream(stream: Http2CallStream, metadata: Metadata) {
|
|
this.tryGetConfig(stream, metadata.clone());
|
|
}
|
|
|
|
close() {
|
|
this.resolvingLoadBalancer.destroy();
|
|
this.updateState(ConnectivityState.SHUTDOWN);
|
|
clearInterval(this.callRefTimer);
|
|
if (this.channelzEnabled) {
|
|
unregisterChannelzRef(this.channelzRef);
|
|
}
|
|
|
|
this.subchannelPool.unrefUnusedSubchannels();
|
|
}
|
|
|
|
getTarget() {
|
|
return uriToString(this.target);
|
|
}
|
|
|
|
getConnectivityState(tryToConnect: boolean) {
|
|
const connectivityState = this.connectivityState;
|
|
if (tryToConnect) {
|
|
this.resolvingLoadBalancer.exitIdle();
|
|
}
|
|
return connectivityState;
|
|
}
|
|
|
|
watchConnectivityState(
|
|
currentState: ConnectivityState,
|
|
deadline: Date | number,
|
|
callback: (error?: Error) => void
|
|
): void {
|
|
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
|
throw new Error('Channel has been shut down');
|
|
}
|
|
let timer = null;
|
|
if (deadline !== Infinity) {
|
|
const deadlineDate: Date =
|
|
deadline instanceof Date ? deadline : new Date(deadline);
|
|
const now = new Date();
|
|
if (deadline === -Infinity || deadlineDate <= now) {
|
|
process.nextTick(
|
|
callback,
|
|
new Error('Deadline passed without connectivity state change')
|
|
);
|
|
return;
|
|
}
|
|
timer = setTimeout(() => {
|
|
this.removeConnectivityStateWatcher(watcherObject);
|
|
callback(
|
|
new Error('Deadline passed without connectivity state change')
|
|
);
|
|
}, deadlineDate.getTime() - now.getTime());
|
|
}
|
|
const watcherObject = {
|
|
currentState,
|
|
callback,
|
|
timer,
|
|
};
|
|
this.connectivityStateWatchers.push(watcherObject);
|
|
}
|
|
|
|
/**
|
|
* Get the channelz reference object for this channel. The returned value is
|
|
* garbage if channelz is disabled for this channel.
|
|
* @returns
|
|
*/
|
|
getChannelzRef() {
|
|
return this.channelzRef;
|
|
}
|
|
|
|
createCall(
|
|
method: string,
|
|
deadline: Deadline,
|
|
host: string | null | undefined,
|
|
parentCall: ServerSurfaceCall | null,
|
|
propagateFlags: number | null | undefined
|
|
): Call {
|
|
if (typeof method !== 'string') {
|
|
throw new TypeError('Channel#createCall: method must be a string');
|
|
}
|
|
if (!(typeof deadline === 'number' || deadline instanceof Date)) {
|
|
throw new TypeError(
|
|
'Channel#createCall: deadline must be a number or Date'
|
|
);
|
|
}
|
|
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
|
throw new Error('Channel has been shut down');
|
|
}
|
|
const callNumber = getNewCallNumber();
|
|
this.trace(
|
|
'createCall [' +
|
|
callNumber +
|
|
'] method="' +
|
|
method +
|
|
'", deadline=' +
|
|
deadline
|
|
);
|
|
const finalOptions: CallStreamOptions = {
|
|
deadline: deadline,
|
|
flags: propagateFlags ?? Propagate.DEFAULTS,
|
|
host: host ?? this.defaultAuthority,
|
|
parentCall: parentCall,
|
|
};
|
|
const stream: Http2CallStream = new Http2CallStream(
|
|
method,
|
|
this,
|
|
finalOptions,
|
|
this.filterStackFactory,
|
|
this.credentials._getCallCredentials(),
|
|
callNumber
|
|
);
|
|
if (this.channelzEnabled) {
|
|
this.callTracker.addCallStarted();
|
|
stream.addStatusWatcher(status => {
|
|
if (status.code === Status.OK) {
|
|
this.callTracker.addCallSucceeded();
|
|
} else {
|
|
this.callTracker.addCallFailed();
|
|
}
|
|
});
|
|
}
|
|
return stream;
|
|
}
|
|
}
|