mirror of
https://github.com/grpc/grpc-node.git
synced 2026-02-01 14:54:35 +00:00
Merge branch 'master' into grpc-js_round_robin
This commit is contained in:
commit
a53b36d680
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@grpc/grpc-js",
|
||||
"version": "0.6.4",
|
||||
"version": "0.6.9",
|
||||
"description": "gRPC Library for Node - pure JS implementation",
|
||||
"homepage": "https://grpc.io/",
|
||||
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
||||
|
||||
@ -68,6 +68,20 @@ export type ClientDuplexStream<
|
||||
ResponseType
|
||||
> = ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
|
||||
|
||||
/**
|
||||
* Construct a ServiceError from a StatusObject. This function exists primarily
|
||||
* as an attempt to make the error stack trace clearly communicate that the
|
||||
* error is not necessarily a problem in gRPC itself.
|
||||
* @param status
|
||||
*/
|
||||
export function callErrorFromStatus(status: StatusObject): ServiceError {
|
||||
const message = `${status.code} ${Status[status.code]}: ${status.details}`;
|
||||
return Object.assign(
|
||||
new Error(message),
|
||||
status
|
||||
);
|
||||
}
|
||||
|
||||
export class ClientUnaryCallImpl extends EventEmitter
|
||||
implements ClientUnaryCall {
|
||||
constructor(private readonly call: Call) {
|
||||
@ -118,11 +132,7 @@ function setUpReadableStream<ResponseType>(
|
||||
});
|
||||
call.on('status', (status: StatusObject) => {
|
||||
if (status.code !== Status.OK) {
|
||||
const error: ServiceError = Object.assign(
|
||||
new Error(status.details),
|
||||
status
|
||||
);
|
||||
stream.emit('error', error);
|
||||
stream.emit('error', callErrorFromStatus(status));
|
||||
}
|
||||
stream.emit('status', status);
|
||||
statusEmitted = true;
|
||||
|
||||
@ -28,7 +28,7 @@ import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
|
||||
import { ChannelControlHelper } from './load-balancer';
|
||||
import { UnavailablePicker, Picker, PickResultType } from './picker';
|
||||
import { Metadata } from './metadata';
|
||||
import { Status } from './constants';
|
||||
import { Status, LogVerbosity } from './constants';
|
||||
import { FilterStackFactory } from './filter-stack';
|
||||
import { CallCredentialsFilterFactory } from './call-credentials-filter';
|
||||
import { DeadlineFilterFactory } from './deadline-filter';
|
||||
@ -37,6 +37,7 @@ import { CompressionFilterFactory } from './compression-filter';
|
||||
import { getDefaultAuthority } from './resolver';
|
||||
import { LoadBalancingConfig } from './load-balancing-config';
|
||||
import { ServiceConfig, validateServiceConfig } from './service-config';
|
||||
import { trace } from './logging';
|
||||
|
||||
export enum ConnectivityState {
|
||||
CONNECTING,
|
||||
@ -268,6 +269,7 @@ export class ChannelImplementation implements Channel {
|
||||
}
|
||||
|
||||
private updateState(newState: ConnectivityState): void {
|
||||
trace(LogVerbosity.DEBUG, 'connectivity_state', this.target + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
||||
this.connectivityState = newState;
|
||||
const watchersCopy = this.connectivityStateWatchers.slice();
|
||||
for (const watcherObject of watchersCopy) {
|
||||
|
||||
@ -25,6 +25,7 @@ import {
|
||||
ClientWritableStream,
|
||||
ClientWritableStreamImpl,
|
||||
ServiceError,
|
||||
callErrorFromStatus,
|
||||
} from './call';
|
||||
import { CallCredentials } from './call-credentials';
|
||||
import { Call, Deadline, StatusObject, WriteObject } from './call-stream';
|
||||
@ -151,11 +152,7 @@ export class Client {
|
||||
if (status.code === Status.OK) {
|
||||
callback(null, responseMessage as ResponseType);
|
||||
} else {
|
||||
const error: ServiceError = Object.assign(
|
||||
new Error(status.details),
|
||||
status
|
||||
);
|
||||
callback(error);
|
||||
callback(callErrorFromStatus(status));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ import {
|
||||
loadPackageDefinition,
|
||||
makeClientConstructor,
|
||||
Serialize,
|
||||
ServiceDefinition
|
||||
} from './make-client';
|
||||
import { Metadata } from './metadata';
|
||||
import {
|
||||
@ -46,6 +47,9 @@ import {
|
||||
import { KeyCertPair, ServerCredentials } from './server-credentials';
|
||||
import { StatusBuilder } from './status-builder';
|
||||
import {
|
||||
handleBidiStreamingCall,
|
||||
handleServerStreamingCall,
|
||||
handleUnaryCall,
|
||||
ServerUnaryCall,
|
||||
ServerReadableStream,
|
||||
ServerWritableStream,
|
||||
@ -227,10 +231,19 @@ export {
|
||||
ServerReadableStream,
|
||||
ServerWritableStream,
|
||||
ServerDuplexStream,
|
||||
ServiceDefinition,
|
||||
UntypedHandleCall,
|
||||
UntypedServiceImplementation,
|
||||
};
|
||||
|
||||
/**** Server ****/
|
||||
|
||||
export {
|
||||
handleBidiStreamingCall,
|
||||
handleServerStreamingCall,
|
||||
handleUnaryCall,
|
||||
};
|
||||
|
||||
/* tslint:disable:no-any */
|
||||
export type Call =
|
||||
| ClientUnaryCall
|
||||
|
||||
@ -31,6 +31,14 @@ import {
|
||||
} from './picker';
|
||||
import { LoadBalancingConfig } from './load-balancing-config';
|
||||
import { Subchannel, ConnectivityStateListener } from './subchannel';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'pick_first';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const TYPE_NAME = 'pick_first';
|
||||
|
||||
@ -56,6 +64,14 @@ class PickFirstPicker implements Picker {
|
||||
}
|
||||
}
|
||||
|
||||
interface ConnectivityStateCounts {
|
||||
[ConnectivityState.CONNECTING]: number,
|
||||
[ConnectivityState.IDLE]: number,
|
||||
[ConnectivityState.READY]: number,
|
||||
[ConnectivityState.SHUTDOWN]: number,
|
||||
[ConnectivityState.TRANSIENT_FAILURE]: number
|
||||
}
|
||||
|
||||
export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
/**
|
||||
* The list of backend addresses most recently passed to `updateAddressList`.
|
||||
@ -75,11 +91,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
* recently started connection attempt.
|
||||
*/
|
||||
private currentSubchannelIndex = 0;
|
||||
/**
|
||||
* The number of subchannels in the `subchannels` list currently in the
|
||||
* CONNECTING state. Used to determine the overall load balancer state.
|
||||
*/
|
||||
private subchannelConnectingCount = 0;
|
||||
|
||||
private subchannelStateCounts: ConnectivityStateCounts;
|
||||
/**
|
||||
* The currently picked subchannel used for making calls. Populated if
|
||||
* and only if the load balancer's current state is READY. In that case,
|
||||
@ -111,17 +124,20 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
*/
|
||||
constructor(private channelControlHelper: ChannelControlHelper) {
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
this.subchannelStateCounts = {
|
||||
[ConnectivityState.CONNECTING]: 0,
|
||||
[ConnectivityState.IDLE]: 0,
|
||||
[ConnectivityState.READY]: 0,
|
||||
[ConnectivityState.SHUTDOWN]: 0,
|
||||
[ConnectivityState.TRANSIENT_FAILURE]: 0
|
||||
};
|
||||
this.subchannelStateListener = (
|
||||
subchannel: Subchannel,
|
||||
previousState: ConnectivityState,
|
||||
newState: ConnectivityState
|
||||
) => {
|
||||
if (previousState === ConnectivityState.CONNECTING) {
|
||||
this.subchannelConnectingCount -= 1;
|
||||
}
|
||||
if (newState === ConnectivityState.CONNECTING) {
|
||||
this.subchannelConnectingCount += 1;
|
||||
}
|
||||
this.subchannelStateCounts[previousState] -= 1;
|
||||
this.subchannelStateCounts[newState] += 1;
|
||||
/* If the subchannel we most recently attempted to start connecting
|
||||
* to goes into TRANSIENT_FAILURE, immediately try to start
|
||||
* connecting to the next one instead of waiting for the connection
|
||||
@ -136,12 +152,22 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
this.pickSubchannel(subchannel);
|
||||
return;
|
||||
} else {
|
||||
if (this.triedAllSubchannels && this.subchannelStateCounts[ConnectivityState.IDLE] === this.subchannels.length) {
|
||||
/* If all of the subchannels are IDLE we should go back to a
|
||||
* basic IDLE state where there is no subchannel list to avoid
|
||||
* holding unused resources */
|
||||
this.resetSubchannelList();
|
||||
}
|
||||
if (this.currentPick === null) {
|
||||
if (this.triedAllSubchannels) {
|
||||
const newLBState =
|
||||
this.subchannelConnectingCount > 0
|
||||
? ConnectivityState.CONNECTING
|
||||
: ConnectivityState.TRANSIENT_FAILURE;
|
||||
let newLBState: ConnectivityState;
|
||||
if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
|
||||
newLBState = ConnectivityState.CONNECTING;
|
||||
} else if (this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > 0) {
|
||||
newLBState = ConnectivityState.TRANSIENT_FAILURE;
|
||||
} else {
|
||||
newLBState = ConnectivityState.IDLE;
|
||||
}
|
||||
if (newLBState !== this.currentState) {
|
||||
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
|
||||
this.updateState(newLBState, new UnavailablePicker());
|
||||
@ -164,23 +190,38 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
newState: ConnectivityState
|
||||
) => {
|
||||
if (newState !== ConnectivityState.READY) {
|
||||
this.currentPick = null;
|
||||
subchannel.unref();
|
||||
subchannel.removeConnectivityStateListener(
|
||||
this.pickedSubchannelStateListener
|
||||
);
|
||||
if (this.subchannels.length > 0) {
|
||||
const newLBState =
|
||||
this.subchannelConnectingCount > 0
|
||||
? ConnectivityState.CONNECTING
|
||||
: ConnectivityState.TRANSIENT_FAILURE;
|
||||
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
|
||||
this.updateState(newLBState, new UnavailablePicker());
|
||||
if (this.triedAllSubchannels) {
|
||||
let newLBState: ConnectivityState;
|
||||
if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
|
||||
newLBState = ConnectivityState.CONNECTING;
|
||||
} else if (this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > 0) {
|
||||
newLBState = ConnectivityState.TRANSIENT_FAILURE;
|
||||
} else {
|
||||
newLBState = ConnectivityState.IDLE;
|
||||
}
|
||||
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
|
||||
this.updateState(newLBState, new UnavailablePicker());
|
||||
} else {
|
||||
this.updateState(newLBState, new QueuePicker(this));
|
||||
}
|
||||
} else {
|
||||
this.updateState(newLBState, new QueuePicker(this));
|
||||
this.updateState(
|
||||
ConnectivityState.CONNECTING,
|
||||
new QueuePicker(this)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
this.connectToAddressList();
|
||||
this.channelControlHelper.requestReresolution();
|
||||
/* We don't need to backoff here because this only happens if a
|
||||
* subchannel successfully connects then disconnects, so it will not
|
||||
* create a loop of attempting to connect to an unreachable backend
|
||||
*/
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -218,6 +259,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
this.subchannels[subchannelIndex].getConnectivityState() ===
|
||||
ConnectivityState.IDLE
|
||||
) {
|
||||
trace('Start connecting to subchannel with address ' + this.subchannels[subchannelIndex].getAddress());
|
||||
process.nextTick(() => {
|
||||
this.subchannels[subchannelIndex].startConnecting();
|
||||
});
|
||||
@ -228,6 +270,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
}
|
||||
|
||||
private pickSubchannel(subchannel: Subchannel) {
|
||||
trace('Pick subchannel with address ' + subchannel.getAddress());
|
||||
if (this.currentPick !== null) {
|
||||
this.currentPick.unref();
|
||||
this.currentPick.removeConnectivityStateListener(
|
||||
@ -243,6 +286,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
}
|
||||
|
||||
private updateState(newState: ConnectivityState, picker: Picker) {
|
||||
trace(ConnectivityState[this.currentState] + ' -> ' + ConnectivityState[newState]);
|
||||
this.currentState = newState;
|
||||
this.channelControlHelper.updateState(newState, picker);
|
||||
}
|
||||
@ -253,7 +297,13 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
subchannel.unref();
|
||||
}
|
||||
this.currentSubchannelIndex = 0;
|
||||
this.subchannelConnectingCount = 0;
|
||||
this.subchannelStateCounts = {
|
||||
[ConnectivityState.CONNECTING]: 0,
|
||||
[ConnectivityState.IDLE]: 0,
|
||||
[ConnectivityState.READY]: 0,
|
||||
[ConnectivityState.SHUTDOWN]: 0,
|
||||
[ConnectivityState.TRANSIENT_FAILURE]: 0
|
||||
};
|
||||
this.subchannels = [];
|
||||
this.triedAllSubchannels = false;
|
||||
}
|
||||
@ -264,6 +314,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
*/
|
||||
private connectToAddressList(): void {
|
||||
this.resetSubchannelList();
|
||||
trace('Connect to address list ' + this.latestAddressList);
|
||||
this.subchannels = this.latestAddressList.map(address =>
|
||||
this.channelControlHelper.createSubchannel(address, {})
|
||||
);
|
||||
@ -274,10 +325,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
subchannel.addConnectivityStateListener(this.subchannelStateListener);
|
||||
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
|
||||
this.pickSubchannel(subchannel);
|
||||
this.updateState(
|
||||
ConnectivityState.READY,
|
||||
new PickFirstPicker(subchannel)
|
||||
);
|
||||
this.resetSubchannelList();
|
||||
return;
|
||||
}
|
||||
@ -289,15 +336,19 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
subchannelState === ConnectivityState.CONNECTING
|
||||
) {
|
||||
this.startConnecting(index);
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
if (this.currentPick === null) {
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE
|
||||
this.updateState(
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
new UnavailablePicker()
|
||||
);
|
||||
if (this.currentPick === null) {
|
||||
this.updateState(
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
new UnavailablePicker()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
updateAddressList(
|
||||
@ -305,8 +356,13 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
lbConfig: LoadBalancingConfig | null
|
||||
): void {
|
||||
// lbConfig has no useful information for pick first load balancing
|
||||
this.latestAddressList = addressList;
|
||||
this.connectToAddressList();
|
||||
/* To avoid unnecessary churn, we only do something with this address list
|
||||
* if we're not currently trying to establish a connection, or if the new
|
||||
* address list is different from the existing one */
|
||||
if (this.subchannels.length === 0 || !this.latestAddressList.every((value, index) => addressList[index] === value)) {
|
||||
this.latestAddressList = addressList;
|
||||
this.connectToAddressList();
|
||||
}
|
||||
}
|
||||
|
||||
exitIdle() {
|
||||
@ -318,6 +374,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
this.connectToAddressList();
|
||||
}
|
||||
}
|
||||
if (this.currentState === ConnectivityState.IDLE || this.triedAllSubchannels) {
|
||||
this.channelControlHelper.requestReresolution();
|
||||
}
|
||||
}
|
||||
|
||||
resetBackoff() {
|
||||
|
||||
@ -18,7 +18,23 @@
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
let _logger: Partial<Console> = console;
|
||||
let _logVerbosity: LogVerbosity = LogVerbosity.DEBUG;
|
||||
let _logVerbosity: LogVerbosity = LogVerbosity.ERROR;
|
||||
|
||||
if (process.env.GRPC_VERBOSITY) {
|
||||
switch (process.env.GRPC_VERBOSITY) {
|
||||
case 'DEBUG':
|
||||
_logVerbosity = LogVerbosity.DEBUG;
|
||||
break;
|
||||
case 'INFO':
|
||||
_logVerbosity = LogVerbosity.INFO;
|
||||
break;
|
||||
case 'ERROR':
|
||||
_logVerbosity = LogVerbosity.ERROR;
|
||||
break;
|
||||
default:
|
||||
// Ignore any other values
|
||||
}
|
||||
}
|
||||
|
||||
export const getLogger = (): Partial<Console> => {
|
||||
return _logger;
|
||||
@ -38,3 +54,12 @@ export const log = (severity: LogVerbosity, ...args: any[]): void => {
|
||||
_logger.error(...args);
|
||||
}
|
||||
};
|
||||
|
||||
const enabledTracers = process.env.GRPC_TRACE ? process.env.GRPC_TRACE.split(',') : [];
|
||||
const allEnabled = enabledTracers.includes('all');
|
||||
|
||||
export function trace(severity: LogVerbosity, tracer: string, text: string): void {
|
||||
if (allEnabled || enabledTracers.includes(tracer)) {
|
||||
log(severity, (new Date().toISOString() + ' | ' + tracer + ' | ' + text));
|
||||
}
|
||||
}
|
||||
@ -39,7 +39,7 @@ export interface MethodDefinition<RequestType, ResponseType> {
|
||||
}
|
||||
|
||||
export interface ServiceDefinition {
|
||||
[index: string]: MethodDefinition<object, object>;
|
||||
[index: string]: MethodDefinition<any, any>;
|
||||
}
|
||||
|
||||
export interface ProtobufTypeDefinition {
|
||||
|
||||
@ -28,6 +28,14 @@ import { ServiceError } from './call';
|
||||
import { Status } from './constants';
|
||||
import { StatusObject } from './call-stream';
|
||||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'dns_resolver';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
/* These regular expressions match IP addresses with optional ports in different
|
||||
* formats. In each case, capture group 1 contains the address, and capture
|
||||
@ -159,6 +167,7 @@ class DnsResolver implements Resolver {
|
||||
private percentage: number;
|
||||
private defaultResolutionError: StatusObject;
|
||||
constructor(private target: string, private listener: ResolverListener) {
|
||||
trace('Resolver constructed for target ' + target);
|
||||
this.ipResult = parseIP(target);
|
||||
const dnsMatch = DNS_REGEX.exec(target);
|
||||
if (dnsMatch === null) {
|
||||
@ -187,6 +196,7 @@ class DnsResolver implements Resolver {
|
||||
*/
|
||||
private startResolution() {
|
||||
if (this.ipResult !== null) {
|
||||
trace('Returning IP address for target ' + this.target);
|
||||
setImmediate(() => {
|
||||
this.listener.onSuccessfulResolution(this.ipResult!, null, null);
|
||||
});
|
||||
@ -222,6 +232,7 @@ class DnsResolver implements Resolver {
|
||||
ip4Addresses,
|
||||
ip6Addresses
|
||||
);
|
||||
trace('Resolved addresses for target ' + this.target + ': ' + allAddresses);
|
||||
if (allAddresses.length === 0) {
|
||||
this.listener.onError(this.defaultResolutionError);
|
||||
return;
|
||||
@ -255,6 +266,7 @@ class DnsResolver implements Resolver {
|
||||
);
|
||||
},
|
||||
err => {
|
||||
trace('Resolution error for target ' + this.target + ': ' + (err as Error).message);
|
||||
this.pendingResultPromise = null;
|
||||
this.listener.onError(this.defaultResolutionError);
|
||||
}
|
||||
@ -263,6 +275,7 @@ class DnsResolver implements Resolver {
|
||||
}
|
||||
|
||||
updateResolution() {
|
||||
trace('Resolution update requested for target ' + this.target);
|
||||
if (this.pendingResultPromise === null) {
|
||||
this.startResolution();
|
||||
}
|
||||
|
||||
@ -32,6 +32,14 @@ import { BackoffTimeout } from './backoff-timeout';
|
||||
import { Status } from './constants';
|
||||
import { StatusObject } from './call-stream';
|
||||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'resolving_load_balancer';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
|
||||
|
||||
@ -69,6 +77,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
*/
|
||||
private innerBalancerState: ConnectivityState = ConnectivityState.IDLE;
|
||||
|
||||
private innerBalancerPicker: Picker = new UnavailablePicker();
|
||||
|
||||
/**
|
||||
* The most recent reported state of the pendingReplacementLoadBalancer.
|
||||
* Starts at IDLE for type simplicity. This should get updated as soon as the
|
||||
@ -96,6 +106,12 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
*/
|
||||
private readonly backoffTimeout: BackoffTimeout;
|
||||
|
||||
/**
|
||||
* Indicates whether we should attempt to resolve again after the backoff
|
||||
* timer runs out.
|
||||
*/
|
||||
private continueResolving = false;
|
||||
|
||||
/**
|
||||
* Wrapper class that behaves like a `LoadBalancer` and also handles name
|
||||
* resolution internally.
|
||||
@ -237,12 +253,22 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
this.innerBalancerState = connectivityState;
|
||||
if (connectivityState === ConnectivityState.IDLE) {
|
||||
picker = new QueuePicker(this);
|
||||
}
|
||||
this.innerBalancerPicker = picker;
|
||||
if (
|
||||
connectivityState !== ConnectivityState.READY &&
|
||||
this.pendingReplacementLoadBalancer !== null
|
||||
) {
|
||||
this.switchOverReplacementBalancer();
|
||||
} else {
|
||||
if (connectivityState === ConnectivityState.IDLE) {
|
||||
if (this.innerLoadBalancer) {
|
||||
this.innerLoadBalancer.destroy();
|
||||
this.innerLoadBalancer = null;
|
||||
}
|
||||
}
|
||||
this.updateState(connectivityState, picker);
|
||||
}
|
||||
},
|
||||
@ -252,8 +278,10 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
* making resolve requests, so we shouldn't make another one here.
|
||||
* In that case, the backoff timer callback will call
|
||||
* updateResolution */
|
||||
if (!this.backoffTimeout.isRunning()) {
|
||||
this.innerResolver.updateResolution();
|
||||
if (this.backoffTimeout.isRunning()) {
|
||||
this.continueResolving = true;
|
||||
} else {
|
||||
this.updateResolution();
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -270,33 +298,56 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
);
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
if (connectivityState === ConnectivityState.IDLE) {
|
||||
picker = new QueuePicker(this);
|
||||
}
|
||||
this.replacementBalancerState = connectivityState;
|
||||
this.replacementBalancerPicker = picker;
|
||||
if (connectivityState === ConnectivityState.READY) {
|
||||
this.switchOverReplacementBalancer();
|
||||
} else if (connectivityState === ConnectivityState.IDLE) {
|
||||
if (this.pendingReplacementLoadBalancer) {
|
||||
this.pendingReplacementLoadBalancer.destroy();
|
||||
this.pendingReplacementLoadBalancer = null;
|
||||
}
|
||||
}
|
||||
},
|
||||
requestReresolution: () => {
|
||||
if (!this.backoffTimeout.isRunning()) {
|
||||
/* If the backoffTimeout is running, we're still backing off from
|
||||
* making resolve requests, so we shouldn't make another one here.
|
||||
* In that case, the backoff timer callback will call
|
||||
* updateResolution */
|
||||
this.innerResolver.updateResolution();
|
||||
/* If the backoffTimeout is running, we're still backing off from
|
||||
* making resolve requests, so we shouldn't make another one here.
|
||||
* In that case, the backoff timer callback will call
|
||||
* updateResolution */
|
||||
if (this.backoffTimeout.isRunning()) {
|
||||
this.continueResolving = true;
|
||||
} else {
|
||||
this.updateResolution();
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
this.backoffTimeout = new BackoffTimeout(() => {
|
||||
if (this.innerLoadBalancer === null) {
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
if (this.continueResolving) {
|
||||
this.updateResolution();
|
||||
this.continueResolving = false;
|
||||
} else {
|
||||
this.innerResolver.updateResolution();
|
||||
if (this.innerLoadBalancer === null) {
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
} else {
|
||||
this.updateState(this.innerBalancerState, this.innerBalancerPicker);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private updateResolution() {
|
||||
this.innerResolver.updateResolution();
|
||||
if (this.innerLoadBalancer === null || this.innerBalancerState === ConnectivityState.IDLE) {
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
}
|
||||
}
|
||||
|
||||
private updateState(connectivitystate: ConnectivityState, picker: Picker) {
|
||||
trace(this.target + ' ' + ConnectivityState[this.currentState] + ' -> ' + ConnectivityState[connectivitystate]);
|
||||
this.currentState = connectivitystate;
|
||||
this.channelControlHelper.updateState(connectivitystate, picker);
|
||||
}
|
||||
@ -314,6 +365,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
);
|
||||
this.pendingReplacementLoadBalancer = null;
|
||||
this.innerBalancerState = this.replacementBalancerState;
|
||||
this.innerBalancerPicker = this.replacementBalancerPicker;
|
||||
this.updateState(
|
||||
this.replacementBalancerState,
|
||||
this.replacementBalancerPicker
|
||||
@ -321,7 +373,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
}
|
||||
|
||||
private handleResolutionFailure(error: StatusObject) {
|
||||
if (this.innerLoadBalancer === null) {
|
||||
if (this.innerLoadBalancer === null || this.innerBalancerState === ConnectivityState.IDLE) {
|
||||
this.updateState(
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
new UnavailablePicker(error)
|
||||
@ -335,7 +387,11 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
this.innerLoadBalancer.exitIdle();
|
||||
}
|
||||
if (this.currentState === ConnectivityState.IDLE) {
|
||||
this.innerResolver.updateResolution();
|
||||
if (this.backoffTimeout.isRunning()) {
|
||||
this.continueResolving = true;
|
||||
} else {
|
||||
this.updateResolution();
|
||||
}
|
||||
this.updateState(
|
||||
ConnectivityState.CONNECTING,
|
||||
new QueuePicker(this)
|
||||
|
||||
@ -24,9 +24,17 @@ import { PeerCertificate, checkServerIdentity } from 'tls';
|
||||
import { ConnectivityState } from './channel';
|
||||
import { BackoffTimeout } from './backoff-timeout';
|
||||
import { getDefaultAuthority } from './resolver';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const { version: clientVersion } = require('../../package.json');
|
||||
|
||||
const TRACER_NAME = 'subchannel';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const MIN_CONNECT_TIMEOUT_MS = 20000;
|
||||
const INITIAL_BACKOFF_MS = 1000;
|
||||
const BACKOFF_MULTIPLIER = 1.6;
|
||||
@ -234,32 +242,50 @@ export class Subchannel {
|
||||
connectionOptions.servername = getDefaultAuthority(this.channelTarget);
|
||||
}
|
||||
}
|
||||
this.session = http2.connect(
|
||||
const session = http2.connect(
|
||||
addressScheme + this.subchannelAddress,
|
||||
connectionOptions
|
||||
);
|
||||
this.session.unref();
|
||||
this.session.once('connect', () => {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING],
|
||||
ConnectivityState.READY
|
||||
);
|
||||
this.session = session;
|
||||
session.unref();
|
||||
/* For all of these events, check if the session at the time of the event
|
||||
* is the same one currently attached to this subchannel, to ensure that
|
||||
* old events from previous connection attempts cannot cause invalid state
|
||||
* transitions. */
|
||||
session.once('connect', () => {
|
||||
if (this.session === session) {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING],
|
||||
ConnectivityState.READY
|
||||
);
|
||||
}
|
||||
});
|
||||
this.session.once('close', () => {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
ConnectivityState.TRANSIENT_FAILURE
|
||||
);
|
||||
session.once('close', () => {
|
||||
if (this.session === session) {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING],
|
||||
ConnectivityState.TRANSIENT_FAILURE
|
||||
);
|
||||
/* Transitioning directly to IDLE here should be OK because we are not
|
||||
* doing any backoff, because a connection was established at some
|
||||
* point */
|
||||
this.transitionToState(
|
||||
[ConnectivityState.READY],
|
||||
ConnectivityState.IDLE
|
||||
);
|
||||
}
|
||||
});
|
||||
this.session.once('goaway', () => {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
ConnectivityState.IDLE
|
||||
);
|
||||
session.once('goaway', () => {
|
||||
if (this.session === session) {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
ConnectivityState.IDLE
|
||||
);
|
||||
}
|
||||
});
|
||||
this.session.once('error', error => {
|
||||
session.once('error', error => {
|
||||
/* Do nothing here. Any error should also trigger a close event, which is
|
||||
* where we want to handle that. */
|
||||
* where we want to handle that. */
|
||||
});
|
||||
}
|
||||
|
||||
@ -277,6 +303,7 @@ export class Subchannel {
|
||||
if (oldStates.indexOf(this.connectivityState) === -1) {
|
||||
return false;
|
||||
}
|
||||
trace(this.subchannelAddress + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
||||
const previousState = this.connectivityState;
|
||||
this.connectivityState = newState;
|
||||
switch (newState) {
|
||||
@ -464,4 +491,8 @@ export class Subchannel {
|
||||
ConnectivityState.CONNECTING
|
||||
);
|
||||
}
|
||||
|
||||
getAddress(): string {
|
||||
return this.subchannelAddress;
|
||||
}
|
||||
}
|
||||
|
||||
@ -160,7 +160,6 @@ describe('Cancellation', () => {
|
||||
call.on('error', (error: ServiceError) => {
|
||||
assert.strictEqual(error.code, grpc.status.CANCELLED);
|
||||
assert.strictEqual(error.details, 'Cancelled on client');
|
||||
assert.strictEqual(error.message, 'Cancelled on client');
|
||||
waitForServerCancel();
|
||||
});
|
||||
|
||||
|
||||
@ -82,12 +82,10 @@
|
||||
'deps/grpc/third_party/address_sorting/include',
|
||||
'deps/grpc/third_party/cares',
|
||||
'deps/grpc/third_party/cares/cares',
|
||||
'deps/grpc/third_party/nanopb',
|
||||
'deps/grpc/third_party/upb',
|
||||
'deps/grpc/third_party/upb/generated_for_cmake',
|
||||
],
|
||||
'defines': [
|
||||
'PB_FIELD_32BIT',
|
||||
'GPR_BACKWARDS_COMPATIBILITY_MODE',
|
||||
'GRPC_ARES=1',
|
||||
'GRPC_UV',
|
||||
|
||||
@ -72,7 +72,6 @@
|
||||
"deps/grpc/third_party/boringssl/include/**/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/boringssl/ssl/**/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/boringssl/third_party/**/*.{c,h}",
|
||||
"deps/grpc/third_party/nanopb/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/upb/**/*.{c,h}",
|
||||
"deps/grpc/third_party/zlib/**/*.{c,cc,h}",
|
||||
"binding.gyp"
|
||||
|
||||
@ -78,12 +78,10 @@
|
||||
'deps/grpc/third_party/address_sorting/include',
|
||||
'deps/grpc/third_party/cares',
|
||||
'deps/grpc/third_party/cares/cares',
|
||||
'deps/grpc/third_party/nanopb',
|
||||
'deps/grpc/third_party/upb',
|
||||
'deps/grpc/third_party/upb/generated_for_cmake',
|
||||
],
|
||||
'defines': [
|
||||
'PB_FIELD_32BIT',
|
||||
'GPR_BACKWARDS_COMPATIBILITY_MODE',
|
||||
'GRPC_ARES=1',
|
||||
'GRPC_UV',
|
||||
|
||||
@ -74,7 +74,6 @@
|
||||
"deps/grpc/third_party/boringssl/include/**/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/boringssl/ssl/**/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/boringssl/third_party/**/*.{c,h}",
|
||||
"deps/grpc/third_party/nanopb/*.{c,cc,h}",
|
||||
"deps/grpc/third_party/upb/**/*.{c,h}",
|
||||
"deps/grpc/third_party/zlib/**/*.{c,cc,h}",
|
||||
"binding.gyp"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user