From b4d848865d025d6935fd747de89e515bf04dae41 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 28 Aug 2019 18:03:57 -0700 Subject: [PATCH] Lint fixes --- packages/grpc-js/src/backoff-timeout.ts | 16 +- packages/grpc-js/src/call-credentials.ts | 6 +- packages/grpc-js/src/channel-credentials.ts | 31 +++- packages/grpc-js/src/channel-options.ts | 7 +- packages/grpc-js/src/channel.ts | 175 ++++++++++++------ packages/grpc-js/src/client.ts | 6 +- packages/grpc-js/src/compression-filter.ts | 4 +- .../grpc-js/src/load-balancer-pick-first.ts | 117 +++++++++--- packages/grpc-js/src/load-balancer.ts | 38 ++-- packages/grpc-js/src/load-balancing-config.ts | 27 +-- packages/grpc-js/src/picker.ts | 26 +-- packages/grpc-js/src/resolver-dns.ts | 123 +++++++----- packages/grpc-js/src/resolver.ts | 43 +++-- .../grpc-js/src/resolving-load-balancer.ts | 143 ++++++++++---- packages/grpc-js/src/service-config.ts | 108 +++++++---- packages/grpc-js/src/subchannel-pool.ts | 73 ++++++-- packages/grpc-js/src/subchannel.ts | 144 ++++++++------ packages/grpc-js/test/test-call-stream.ts | 4 +- 18 files changed, 736 insertions(+), 355 deletions(-) diff --git a/packages/grpc-js/src/backoff-timeout.ts b/packages/grpc-js/src/backoff-timeout.ts index 84956c43..8cad14f7 100644 --- a/packages/grpc-js/src/backoff-timeout.ts +++ b/packages/grpc-js/src/backoff-timeout.ts @@ -22,8 +22,8 @@ const BACKOFF_JITTER = 0.2; /** * Get a number uniformly at random in the range [min, max) - * @param min - * @param max + * @param min + * @param max */ function uniformRandom(min: number, max: number) { return Math.random() * (max - min) + min; @@ -43,7 +43,7 @@ export class BackoffTimeout { private jitter: number = BACKOFF_JITTER; private nextDelay: number; private timerId: NodeJS.Timer; - private running: boolean = false; + private running = false; constructor(private callback: () => void, options?: BackoffOptions) { if (options) { @@ -74,9 +74,13 @@ export class BackoffTimeout { this.callback(); this.running = false; }, this.nextDelay); - const nextBackoff = Math.min(this.nextDelay * this.multiplier, this.maxDelay); + const nextBackoff = Math.min( + this.nextDelay * this.multiplier, + this.maxDelay + ); const jitterMagnitude = nextBackoff * this.jitter; - this.nextDelay = nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude); + this.nextDelay = + nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude); } /** @@ -98,4 +102,4 @@ export class BackoffTimeout { isRunning() { return this.running; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/call-credentials.ts b/packages/grpc-js/src/call-credentials.ts index fd32baff..e38672ae 100644 --- a/packages/grpc-js/src/call-credentials.ts +++ b/packages/grpc-js/src/call-credentials.ts @@ -96,7 +96,9 @@ class ComposedCallCredentials extends CallCredentials { return true; } if (other instanceof ComposedCallCredentials) { - return this.creds.every((value, index) => value._equals(other.creds[index])); + return this.creds.every((value, index) => + value._equals(other.creds[index]) + ); } else { return false; } @@ -134,7 +136,7 @@ class SingleCallCredentials extends CallCredentials { return false; } } - } +} class EmptyCallCredentials extends CallCredentials { generateMetadata(options: CallMetadataOptions): Promise { diff --git a/packages/grpc-js/src/channel-credentials.ts b/packages/grpc-js/src/channel-credentials.ts index 345c99bf..c29885fa 100644 --- a/packages/grpc-js/src/channel-credentials.ts +++ b/packages/grpc-js/src/channel-credentials.ts @@ -140,7 +140,12 @@ export abstract class ChannelCredentials { 'Certificate chain must be given with accompanying private key' ); } - return new SecureChannelCredentialsImpl(rootCerts || null, privateKey || null, certChain || null, verifyOptions || {}); + return new SecureChannelCredentialsImpl( + rootCerts || null, + privateKey || null, + certChain || null, + verifyOptions || {} + ); } /** @@ -224,7 +229,10 @@ class SecureChannelCredentialsImpl extends ChannelCredentials { if (!bufferOrNullEqual(this.certChain, other.certChain)) { return false; } - return this.verifyOptions.checkServerIdentity === other.verifyOptions.checkServerIdentity; + return ( + this.verifyOptions.checkServerIdentity === + other.verifyOptions.checkServerIdentity + ); } else { return false; } @@ -232,12 +240,20 @@ class SecureChannelCredentialsImpl extends ChannelCredentials { } class ComposedChannelCredentialsImpl extends ChannelCredentials { - constructor (private channelCredentials: SecureChannelCredentialsImpl, callCreds: CallCredentials) { + constructor( + private channelCredentials: SecureChannelCredentialsImpl, + callCreds: CallCredentials + ) { super(callCreds); } compose(callCredentials: CallCredentials) { - const combinedCallCredentials = this.callCredentials.compose(callCredentials); - return new ComposedChannelCredentialsImpl(this.channelCredentials, combinedCallCredentials); + const combinedCallCredentials = this.callCredentials.compose( + callCredentials + ); + return new ComposedChannelCredentialsImpl( + this.channelCredentials, + combinedCallCredentials + ); } _getConnectionOptions(): ConnectionOptions | null { @@ -251,7 +267,10 @@ class ComposedChannelCredentialsImpl extends ChannelCredentials { return true; } if (other instanceof ComposedChannelCredentialsImpl) { - return this.channelCredentials._equals(other.channelCredentials) && this.callCredentials._equals(other.callCredentials); + return ( + this.channelCredentials._equals(other.channelCredentials) && + this.callCredentials._equals(other.callCredentials) + ); } else { return false; } diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index b54b69cb..b7145970 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -41,13 +41,16 @@ export const recognizedOptions = { 'grpc.keepalive_timeout_ms': true, }; -export function channelOptionsEqual(options1: ChannelOptions, options2: ChannelOptions) { +export function channelOptionsEqual( + options1: ChannelOptions, + options2: ChannelOptions +) { const keys1 = Object.keys(options1).sort(); const keys2 = Object.keys(options2).sort(); if (keys1.length !== keys2.length) { return false; } - for (let i = 0; i < keys1.length; i+=1) { + for (let i = 0; i < keys1.length; i += 1) { if (keys1[i] !== keys2[i]) { return false; } diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 331a34f6..1a2a1db6 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -15,23 +15,28 @@ * */ -import { Deadline, Call, Http2CallStream, CallStreamOptions } from "./call-stream"; -import { ChannelCredentials } from "./channel-credentials"; -import { ChannelOptions } from "./channel-options"; -import { ResolvingLoadBalancer } from "./resolving-load-balancer"; -import { SubchannelPool, getSubchannelPool } from "./subchannel-pool"; -import { ChannelControlHelper } from "./load-balancer"; -import { UnavailablePicker, Picker, PickResultType } from "./picker"; -import { Metadata } from "./metadata"; -import { Status } from "./constants"; -import { FilterStackFactory } from "./filter-stack"; -import { CallCredentialsFilterFactory } from "./call-credentials-filter"; -import { DeadlineFilterFactory } from "./deadline-filter"; -import { MetadataStatusFilterFactory } from "./metadata-status-filter"; -import { CompressionFilterFactory } from "./compression-filter"; -import { getDefaultAuthority } from "./resolver"; -import { LoadBalancingConfig } from "./load-balancing-config"; -import { ServiceConfig } from "./service-config"; +import { + Deadline, + Call, + Http2CallStream, + CallStreamOptions, +} from './call-stream'; +import { ChannelCredentials } from './channel-credentials'; +import { ChannelOptions } from './channel-options'; +import { ResolvingLoadBalancer } from './resolving-load-balancer'; +import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; +import { ChannelControlHelper } from './load-balancer'; +import { UnavailablePicker, Picker, PickResultType } from './picker'; +import { Metadata } from './metadata'; +import { Status } from './constants'; +import { FilterStackFactory } from './filter-stack'; +import { CallCredentialsFilterFactory } from './call-credentials-filter'; +import { DeadlineFilterFactory } from './deadline-filter'; +import { MetadataStatusFilterFactory } from './metadata-status-filter'; +import { CompressionFilterFactory } from './compression-filter'; +import { getDefaultAuthority } from './resolver'; +import { LoadBalancingConfig } from './load-balancing-config'; +import { ServiceConfig } from './service-config'; export enum ConnectivityState { CONNECTING, @@ -111,37 +116,58 @@ export class ChannelImplementation implements Channel { private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; private currentPicker: Picker = new UnavailablePicker(); - private pickQueue: {callStream: Http2CallStream, callMetadata: Metadata}[] = []; + private pickQueue: Array<{ + callStream: Http2CallStream; + callMetadata: Metadata; + }> = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; private defaultAuthority: string; private filterStackFactory: FilterStackFactory; - constructor(private target: string, private readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) { + constructor( + private target: string, + private readonly credentials: ChannelCredentials, + private readonly options: ChannelOptions + ) { // TODO: check channel arg for getting a private pool this.subchannelPool = getSubchannelPool(true); const channelControlHelper: ChannelControlHelper = { - createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { - return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); + createSubchannel: ( + subchannelAddress: string, + subchannelArgs: ChannelOptions + ) => { + return this.subchannelPool.getOrCreateSubchannel( + this.target, + subchannelAddress, + Object.assign({}, this.options, subchannelArgs), + this.credentials + ); }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); this.pickQueue = []; - for (const {callStream, callMetadata} of queueCopy) { + for (const { callStream, callMetadata } of queueCopy) { this.tryPick(callStream, callMetadata); } this.updateState(connectivityState); }, requestReresolution: () => { // This should never be called. - throw new Error('Resolving load balancer should never call requestReresolution'); - } + throw new Error( + 'Resolving load balancer should never call requestReresolution' + ); + }, }; // TODO: check channel arg for default service config const defaultServiceConfig: ServiceConfig = { loadBalancingConfig: [], - methodConfig: [] - } - this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, defaultServiceConfig); + methodConfig: [], + }; + this.resolvingLoadBalancer = new ResolvingLoadBalancer( + target, + channelControlHelper, + defaultServiceConfig + ); this.filterStackFactory = new FilterStackFactory([ new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this), @@ -160,50 +186,79 @@ export class ChannelImplementation implements Channel { * Check the picker output for the given call and corresponding metadata, * and take any relevant actions. Should not be called while iterating * over pickQueue. - * @param callStream - * @param callMetadata + * @param callStream + * @param callMetadata */ private tryPick(callStream: Http2CallStream, callMetadata: Metadata) { - const pickResult = this.currentPicker.pick({metadata: callMetadata}); - switch(pickResult.pickResultType) { + const pickResult = this.currentPicker.pick({ metadata: callMetadata }); + switch (pickResult.pickResultType) { case PickResultType.COMPLETE: if (pickResult.subchannel === null) { - callStream.cancelWithStatus(Status.UNAVAILABLE, "Request dropped by load balancing policy"); + callStream.cancelWithStatus( + Status.UNAVAILABLE, + 'Request dropped by load balancing policy' + ); // End the call with an error } else { /* If the subchannel disconnects between calling pick and getting * the filter stack metadata, the call will end with an error. */ - callStream.filterStack.sendMetadata(Promise.resolve(new Metadata())).then((finalMetadata) => { - if (pickResult.subchannel!.getConnectivityState() === ConnectivityState.READY) { - pickResult.subchannel!.startCallStream(callMetadata, callStream); - } else { - callStream.cancelWithStatus(Status.UNAVAILABLE, 'Connection dropped while starting call'); - } - }, - (error: Error & { code: number }) => { - // We assume the error code isn't 0 (Status.OK) - callStream.cancelWithStatus( - error.code || Status.UNKNOWN, - `Getting metadata from plugin failed with error: ${error.message}` + callStream.filterStack + .sendMetadata(Promise.resolve(new Metadata())) + .then( + finalMetadata => { + if ( + pickResult.subchannel!.getConnectivityState() === + ConnectivityState.READY + ) { + pickResult.subchannel!.startCallStream( + callMetadata, + callStream + ); + } else { + callStream.cancelWithStatus( + Status.UNAVAILABLE, + 'Connection dropped while starting call' + ); + } + }, + (error: Error & { code: number }) => { + // We assume the error code isn't 0 (Status.OK) + callStream.cancelWithStatus( + error.code || Status.UNKNOWN, + `Getting metadata from plugin failed with error: ${ + error.message + }` + ); + } ); - }); } break; case PickResultType.QUEUE: - this.pickQueue.push({callStream, callMetadata}); + this.pickQueue.push({ callStream, callMetadata }); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pickQueue.push({callStream, callMetadata}); + this.pickQueue.push({ callStream, callMetadata }); } else { - callStream.cancelWithStatus(pickResult.status!.code, pickResult.status!.details); + callStream.cancelWithStatus( + pickResult.status!.code, + pickResult.status!.details + ); } break; + default: + throw new Error( + `Invalid state: unknown pickResultType ${pickResult.pickResultType}` + ); } } - private removeConnectivityStateWatcher(watcherObject: ConnectivityStateWatcher) { - const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject); + private removeConnectivityStateWatcher( + watcherObject: ConnectivityStateWatcher + ) { + const watcherIndex = this.connectivityStateWatchers.findIndex( + value => value === watcherObject + ); if (watcherIndex >= 0) { this.connectivityStateWatchers.splice(watcherIndex, 1); } @@ -237,25 +292,31 @@ export class ChannelImplementation implements Channel { getConnectivityState() { return this.connectivityState; } - + watchConnectivityState( currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void ): void { - const deadlineDate: Date = deadline instanceof Date ? deadline : new Date(deadline); + const deadlineDate: Date = + deadline instanceof Date ? deadline : new Date(deadline); const now = new Date(); if (deadlineDate <= now) { - process.nextTick(callback, new Error('Deadline passed without connectivity state change')); + process.nextTick( + callback, + new Error('Deadline passed without connectivity state change') + ); return; } const watcherObject = { - currentState, + currentState, callback, timer: setTimeout(() => { this.removeConnectivityStateWatcher(watcherObject); - callback(new Error('Deadline passed without connectivity state change')); - }, deadlineDate.getTime() - now.getTime()) + callback( + new Error('Deadline passed without connectivity state change') + ); + }, deadlineDate.getTime() - now.getTime()), }; this.connectivityStateWatchers.push(watcherObject); } @@ -286,4 +347,4 @@ export class ChannelImplementation implements Channel { ); return stream; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 87c5c5f2..0e2cffe7 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -78,7 +78,11 @@ export class Client { options ); } else { - this[CHANNEL_SYMBOL] = new ChannelImplementation(address, credentials, options); + this[CHANNEL_SYMBOL] = new ChannelImplementation( + address, + credentials, + options + ); } } diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index ff2dcbeb..f28568e7 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -138,7 +138,9 @@ class UnknownHandler extends CompressionHandler { compressMessage(message: Buffer): Promise { return Promise.reject( new Error( - `Received message compressed wth unsupported compression method ${this.compressionName}` + `Received message compressed wth unsupported compression method ${ + this.compressionName + }` ) ); } diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 047aad71..c8e220b8 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -15,9 +15,20 @@ * */ -import { LoadBalancer, ChannelControlHelper, registerLoadBalancerType } from './load-balancer'; +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, +} from './load-balancer'; import { ConnectivityState } from './channel'; -import { QueuePicker, Picker, PickArgs, CompletePickResult, PickResultType, UnavailablePicker } from './picker'; +import { + QueuePicker, + Picker, + PickArgs, + CompletePickResult, + PickResultType, + UnavailablePicker, +} from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import { Subchannel, ConnectivityStateListener } from './subchannel'; @@ -36,12 +47,12 @@ const CONNECTION_DELAY_INTERVAL_MS = 250; class PickFirstPicker implements Picker { constructor(private subchannel: Subchannel) {} - pick(pickArgs: PickArgs) : CompletePickResult { + pick(pickArgs: PickArgs): CompletePickResult { return { pickResultType: PickResultType.COMPLETE, subchannel: this.subchannel, - status: null - } + status: null, + }; } } @@ -63,12 +74,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { * The index within the `subchannels` array of the subchannel with the most * recently started connection attempt. */ - private currentSubchannelIndex: number = 0; + private currentSubchannelIndex = 0; /** * The number of subchannels in the `subchannels` list currently in the * CONNECTING state. Used to determine the overall load balancer state. */ - private subchannelConnectingCount: number = 0; + private subchannelConnectingCount = 0; /** * The currently picked subchannel used for making calls. Populated if * and only if the load balancer's current state is READY. In that case, @@ -85,11 +96,11 @@ export class PickFirstLoadBalancer implements LoadBalancer { */ private pickedSubchannelStateListener: ConnectivityStateListener; /** - * Timer reference for the timer tracking when to start + * Timer reference for the timer tracking when to start */ private connectionDelayTimeout: NodeJS.Timeout; - private triedAllSubchannels: boolean = false; + private triedAllSubchannels = false; /** * Load balancer that attempts to connect to each backend in the address list @@ -100,7 +111,11 @@ export class PickFirstLoadBalancer implements LoadBalancer { */ constructor(private channelControlHelper: ChannelControlHelper) { this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); - this.subchannelStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => { + this.subchannelStateListener = ( + subchannel: Subchannel, + previousState: ConnectivityState, + newState: ConnectivityState + ) => { if (previousState === ConnectivityState.CONNECTING) { this.subchannelConnectingCount -= 1; } @@ -112,7 +127,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { return; } else { if (this.currentPick === null) { - if (newState === ConnectivityState.TRANSIENT_FAILURE || newState === ConnectivityState.IDLE) { + if ( + newState === ConnectivityState.TRANSIENT_FAILURE || + newState === ConnectivityState.IDLE + ) { process.nextTick(() => { subchannel.startConnecting(); }); @@ -121,11 +139,17 @@ export class PickFirstLoadBalancer implements LoadBalancer { * to goes into TRANSIENT_FAILURE, immediately try to start * connecting to the next one instead of waiting for the connection * delay timer. */ - if (subchannel === this.subchannels[this.currentSubchannelIndex] && newState === ConnectivityState.TRANSIENT_FAILURE) { + if ( + subchannel === this.subchannels[this.currentSubchannelIndex] && + newState === ConnectivityState.TRANSIENT_FAILURE + ) { this.startNextSubchannelConnecting(); } if (this.triedAllSubchannels) { - const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE; + const newLBState = + this.subchannelConnectingCount > 0 + ? ConnectivityState.CONNECTING + : ConnectivityState.TRANSIENT_FAILURE; if (newLBState !== this.currentState) { if (newLBState === ConnectivityState.TRANSIENT_FAILURE) { this.updateState(newLBState, new UnavailablePicker()); @@ -134,17 +158,29 @@ export class PickFirstLoadBalancer implements LoadBalancer { } } } else { - this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); + this.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); } } } }; - this.pickedSubchannelStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => { + this.pickedSubchannelStateListener = ( + subchannel: Subchannel, + previousState: ConnectivityState, + newState: ConnectivityState + ) => { if (newState !== ConnectivityState.READY) { subchannel.unref(); - subchannel.removeConnectivityStateListener(this.pickedSubchannelStateListener); + subchannel.removeConnectivityStateListener( + this.pickedSubchannelStateListener + ); if (this.subchannels.length > 0) { - const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE; + const newLBState = + this.subchannelConnectingCount > 0 + ? ConnectivityState.CONNECTING + : ConnectivityState.TRANSIENT_FAILURE; if (newLBState === ConnectivityState.TRANSIENT_FAILURE) { this.updateState(newLBState, new UnavailablePicker()); } else { @@ -160,7 +196,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { clearTimeout(this.connectionDelayTimeout); } - private startNextSubchannelConnecting() { if (this.triedAllSubchannels) { return; @@ -168,7 +203,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { for (const [index, subchannel] of this.subchannels.entries()) { if (index > this.currentSubchannelIndex) { const subchannelState = subchannel.getConnectivityState(); - if (subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.CONNECTING) { + if ( + subchannelState === ConnectivityState.IDLE || + subchannelState === ConnectivityState.CONNECTING + ) { this.startConnecting(index); return; } @@ -184,20 +222,25 @@ export class PickFirstLoadBalancer implements LoadBalancer { private startConnecting(subchannelIndex: number) { clearTimeout(this.connectionDelayTimeout); this.currentSubchannelIndex = subchannelIndex; - if (this.subchannels[subchannelIndex].getConnectivityState() === ConnectivityState.IDLE) { + if ( + this.subchannels[subchannelIndex].getConnectivityState() === + ConnectivityState.IDLE + ) { process.nextTick(() => { this.subchannels[subchannelIndex].startConnecting(); }); } this.connectionDelayTimeout = setTimeout(() => { this.startNextSubchannelConnecting(); - }, CONNECTION_DELAY_INTERVAL_MS) + }, CONNECTION_DELAY_INTERVAL_MS); } private pickSubchannel(subchannel: Subchannel) { if (this.currentPick !== null) { this.currentPick.unref(); - this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener); + this.currentPick.removeConnectivityStateListener( + this.pickedSubchannelStateListener + ); } this.currentPick = subchannel; this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); @@ -229,7 +272,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { */ private connectToAddressList(): void { this.resetSubchannelList(); - this.subchannels = this.latestAddressList.map((address) => this.channelControlHelper.createSubchannel(address, {})); + this.subchannels = this.latestAddressList.map(address => + this.channelControlHelper.createSubchannel(address, {}) + ); for (const subchannel of this.subchannels) { subchannel.ref(); } @@ -237,24 +282,36 @@ export class PickFirstLoadBalancer implements LoadBalancer { subchannel.addConnectivityStateListener(this.subchannelStateListener); if (subchannel.getConnectivityState() === ConnectivityState.READY) { this.pickSubchannel(subchannel); - this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); + this.updateState( + ConnectivityState.READY, + new PickFirstPicker(subchannel) + ); this.resetSubchannelList(); return; } } for (const [index, subchannel] of this.subchannels.entries()) { const subchannelState = subchannel.getConnectivityState(); - if (subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.CONNECTING) { + if ( + subchannelState === ConnectivityState.IDLE || + subchannelState === ConnectivityState.CONNECTING + ) { this.startConnecting(index); this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); return; } } // If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE - this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker()); + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); } - updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void { + updateAddressList( + addressList: string[], + lbConfig: LoadBalancingConfig | null + ): void { // lbConfig has no useful information for pick first load balancing this.latestAddressList = addressList; this.connectToAddressList(); @@ -277,7 +334,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.resetSubchannelList(); if (this.currentPick !== null) { this.currentPick.unref(); - this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener); + this.currentPick.removeConnectivityStateListener( + this.pickedSubchannelStateListener + ); } } @@ -292,4 +351,4 @@ export class PickFirstLoadBalancer implements LoadBalancer { export function setup(): void { registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer); -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 7f0ca2ca..a74deea7 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -15,11 +15,11 @@ * */ -import { ChannelOptions } from "./channel-options"; -import { Subchannel } from "./subchannel"; -import { ConnectivityState } from "./channel"; -import { Picker } from "./picker"; -import { LoadBalancingConfig } from "./load-balancing-config"; +import { ChannelOptions } from './channel-options'; +import { Subchannel } from './subchannel'; +import { ConnectivityState } from './channel'; +import { Picker } from './picker'; +import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; /** @@ -32,7 +32,10 @@ export interface ChannelControlHelper { * @param subchannelAddress The address to connect to * @param subchannelArgs Extra channel arguments specified by the load balancer */ - createSubchannel(subchannelAddress: string, subchannelArgs: ChannelOptions): Subchannel; + createSubchannel( + subchannelAddress: string, + subchannelArgs: ChannelOptions + ): Subchannel; /** * Passes a new subchannel picker up to the channel. This is called if either * the connectivity state changes or if a different picker is needed for any @@ -61,7 +64,10 @@ export interface LoadBalancer { * @param lbConfig The load balancing config object from the service config, * if one was provided */ - updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void; + updateAddressList( + addressList: string[], + lbConfig: LoadBalancingConfig | null + ): void; /** * If the load balancer is currently in the IDLE state, start connecting. */ @@ -91,16 +97,24 @@ export interface LoadBalancer { } export interface LoadBalancerConstructor { - new(channelControlHelper: ChannelControlHelper): LoadBalancer; + new (channelControlHelper: ChannelControlHelper): LoadBalancer; } -const registeredLoadBalancerTypes: {[name: string]: LoadBalancerConstructor} = {}; +const registeredLoadBalancerTypes: { + [name: string]: LoadBalancerConstructor; +} = {}; -export function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor) { +export function registerLoadBalancerType( + typeName: string, + loadBalancerType: LoadBalancerConstructor +) { registeredLoadBalancerTypes[typeName] = loadBalancerType; } -export function createLoadBalancer(typeName: string, channelControlHelper: ChannelControlHelper): LoadBalancer | null { +export function createLoadBalancer( + typeName: string, + channelControlHelper: ChannelControlHelper +): LoadBalancer | null { if (typeName in registeredLoadBalancerTypes) { return new registeredLoadBalancerTypes[typeName](channelControlHelper); } else { @@ -114,4 +128,4 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean { export function registerAll() { load_balancer_pick_first.setup(); -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index df74cdcb..8607f1fc 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -21,10 +21,11 @@ * specific object type if the input has the right structure, and throws an * error otherwise. */ -import { isString, isArray } from "util"; +/* The any type is purposely used here. All functions validate their input at + * runtime */ +/* tslint:disable:no-any */ -export interface RoundRobinConfig { -} +export interface RoundRobinConfig {} export interface XdsConfig { balancerName: string; @@ -48,16 +49,16 @@ export interface LoadBalancingConfig { * effectively */ function validateXdsConfig(xds: any): XdsConfig { - if (!('balancerName' in xds) || !isString(xds.balancerName)) { + if (!('balancerName' in xds) || typeof xds.balancerName !== 'string') { throw new Error('Invalid xds config: invalid balancerName'); } const xdsConfig: XdsConfig = { balancerName: xds.balancerName, childPolicy: [], - fallbackPolicy: [] + fallbackPolicy: [], }; if ('childPolicy' in xds) { - if (!isArray(xds.childPolicy)) { + if (!Array.isArray(xds.childPolicy)) { throw new Error('Invalid xds config: invalid childPolicy'); } for (const policy of xds.childPolicy) { @@ -65,7 +66,7 @@ function validateXdsConfig(xds: any): XdsConfig { } } if ('fallbackPolicy' in xds) { - if (!isArray(xds.fallbackPolicy)) { + if (!Array.isArray(xds.fallbackPolicy)) { throw new Error('Invalid xds config: invalid fallbackPolicy'); } for (const policy of xds.fallbackPolicy) { @@ -77,10 +78,10 @@ function validateXdsConfig(xds: any): XdsConfig { function validateGrpcLbConfig(grpclb: any): GrpcLbConfig { const grpcLbConfig: GrpcLbConfig = { - childPolicy: [] + childPolicy: [], }; if ('childPolicy' in grpclb) { - if (!isArray(grpclb.childPolicy)) { + if (!Array.isArray(grpclb.childPolicy)) { throw new Error('Invalid xds config: invalid childPolicy'); } for (const policy of grpclb.childPolicy) { @@ -96,17 +97,17 @@ export function validateConfig(obj: any): LoadBalancingConfig { throw new Error('Multiple load balancing policies configured'); } if (obj['round_robin'] instanceof Object) { - return { round_robin: {} } + return { round_robin: {} }; } } if ('xds' in obj) { if ('grpclb' in obj) { throw new Error('Multiple load balancing policies configured'); } - return {xds: validateXdsConfig(obj.xds)}; + return { xds: validateXdsConfig(obj.xds) }; } if ('grpclb' in obj) { - return {grpclb: validateGrpcLbConfig(obj.grpclb)}; + return { grpclb: validateGrpcLbConfig(obj.grpclb) }; } throw new Error('No recognized load balancing policy configured'); -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index ebed1df4..94b8fbe2 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -15,16 +15,16 @@ * */ -import { Subchannel } from "./subchannel"; -import { StatusObject } from "./call-stream"; -import { Metadata } from "./metadata"; -import { Status } from "./constants"; -import { LoadBalancer } from "./load-balancer"; +import { Subchannel } from './subchannel'; +import { StatusObject } from './call-stream'; +import { Metadata } from './metadata'; +import { Status } from './constants'; +import { LoadBalancer } from './load-balancer'; export enum PickResultType { COMPLETE, QUEUE, - TRANSIENT_FAILURE + TRANSIENT_FAILURE, } export interface PickResult { @@ -85,8 +85,8 @@ export class UnavailablePicker implements Picker { } else { this.status = { code: Status.UNAVAILABLE, - details: "No connection established", - metadata: new Metadata() + details: 'No connection established', + metadata: new Metadata(), }; } } @@ -94,7 +94,7 @@ export class UnavailablePicker implements Picker { return { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, - status: this.status + status: this.status, }; } } @@ -107,7 +107,7 @@ export class UnavailablePicker implements Picker { * once any pick is attempted. */ export class QueuePicker { - private calledExitIdle: boolean = false; + private calledExitIdle = false; // Constructed with a load balancer. Calls exitIdle on it the first time pick is called constructor(private loadBalancer: LoadBalancer) {} @@ -119,7 +119,7 @@ export class QueuePicker { return { pickResultType: PickResultType.QUEUE, subchannel: null, - status: null - } + status: null, + }; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index e0c397c9..ff91a8f2 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -14,7 +14,12 @@ * limitations under the License. */ -import { Resolver, ResolverListener, registerResolver, registerDefaultResolver } from './resolver'; +import { + Resolver, + ResolverListener, + registerResolver, + registerDefaultResolver, +} from './resolver'; import * as dns from 'dns'; import * as util from 'util'; import { extractAndSelectServiceConfig, ServiceConfig } from './service-config'; @@ -30,17 +35,17 @@ import { Metadata } from './metadata'; * Matches 4 groups of up to 3 digits each, separated by periods, optionally * followed by a colon and a number. */ -const IPv4_REGEX = /^(\d{1,3}(?:\.\d{1,3}){3})(?::(\d+))?$/; +const IPV4_REGEX = /^(\d{1,3}(?:\.\d{1,3}){3})(?::(\d+))?$/; /** * Matches any number of groups of up to 4 hex digits (case insensitive) * separated by 1 or more colons. This variant does not match a port number. */ -const IPv6_REGEX = /^([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)$/i; +const IPV6_REGEX = /^([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)$/i; /** * Matches the same as the IPv6_REGEX, surrounded by square brackets, and * optionally followed by a colon and a number. */ -const IPv6_BRACKET_REGEX = /^\[([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)\](?::(\d+))?$/i; +const IPV6_BRACKET_REGEX = /^\[([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)\](?::(\d+))?$/i; /** * Matches `[dns:][//authority/]host[:port]`, where `authority` and `host` are @@ -60,13 +65,16 @@ const resolve6Promise = util.promisify(dns.resolve6); /** * Attempt to parse a target string as an IP address - * @param target + * @param target * @return An "IP:port" string if parsing was successful, `null` otherwise */ function parseIP(target: string): string | null { /* These three regular expressions are all mutually exclusive, so we just * want the first one that matches the target string, if any do. */ - const match = IPv4_REGEX.exec(target) || IPv6_REGEX.exec(target) || IPv6_BRACKET_REGEX.exec(target); + const match = + IPV4_REGEX.exec(target) || + IPV6_REGEX.exec(target) || + IPV6_BRACKET_REGEX.exec(target); if (match === null) { return null; } @@ -82,13 +90,17 @@ function parseIP(target: string): string | null { /** * Merge any number of arrays into a single alternating array - * @param arrays + * @param arrays */ function mergeArrays(...arrays: T[][]): T[] { const result: T[] = []; - for(let i = 0; i array.length)); i++) { - for(let array of arrays) { - if(i < array.length) { + for ( + let i = 0; + i < Math.max.apply(null, arrays.map(array => array.length)); + i++ + ) { + for (const array of arrays) { + if (i < array.length) { result.push(array[i]); } } @@ -105,7 +117,9 @@ class DnsResolver implements Resolver { private readonly port: string | null; /* The promise results here contain, in order, the A record, the AAAA record, * and either the TXT record or an error if TXT resolution failed */ - pendingResultPromise: Promise<[string[], string[], string[][] | Error]> | null = null; + pendingResultPromise: Promise< + [string[], string[], string[][] | Error] + > | null = null; percentage: number; constructor(private target: string, private listener: ResolverListener) { this.ipResult = parseIP(target); @@ -126,7 +140,7 @@ class DnsResolver implements Resolver { /** * If the target is an IP address, just provide that address as a result. - * Otherwise, initiate A, AAAA, and TXT + * Otherwise, initiate A, AAAA, and TXT */ private startResolution() { if (this.ipResult !== null) { @@ -137,12 +151,12 @@ class DnsResolver implements Resolver { } if (this.dnsHostname !== null) { const hostname: string = this.dnsHostname; - const Aresult = resolve4Promise(hostname); - const AAAAresult = resolve6Promise(hostname); + const aResult = resolve4Promise(hostname); + const aaaaResult = resolve6Promise(hostname); /* We handle the TXT query promise differently than the others because * the name resolution attempt as a whole is a success even if the TXT * lookup fails */ - const TXTresult = new Promise((resolve, reject) => { + const txtResult = new Promise((resolve, reject) => { dns.resolveTxt(hostname, (err, records) => { if (err) { resolve(err); @@ -151,41 +165,51 @@ class DnsResolver implements Resolver { } }); }); - this.pendingResultPromise = Promise.all([Aresult, AAAAresult, TXTresult]); - this.pendingResultPromise.then(([Arecord, AAAArecord, TXTrecord]) => { - this.pendingResultPromise = null; - Arecord = Arecord.map((value) => `${value}:${this.port}`); - AAAArecord = AAAArecord.map((value) => `[${value}]:${this.port}`); - const allAddresses: string[] = mergeArrays(AAAArecord, Arecord); - let serviceConfig: ServiceConfig | null = null; - let serviceConfigError: StatusObject | null = null; - if (TXTrecord instanceof Error) { - serviceConfigError = { - code: Status.UNAVAILABLE, - details: 'TXT query failed', - metadata: new Metadata() - }; - } else { - try { - serviceConfig = extractAndSelectServiceConfig(TXTrecord, this.percentage); - } catch (err) { + this.pendingResultPromise = Promise.all([aResult, aaaaResult, txtResult]); + this.pendingResultPromise.then( + ([aRecord, aaaaRecord, txtRecord]) => { + this.pendingResultPromise = null; + aRecord = aRecord.map(value => `${value}:${this.port}`); + aaaaRecord = aaaaRecord.map(value => `[${value}]:${this.port}`); + const allAddresses: string[] = mergeArrays(aaaaRecord, aRecord); + let serviceConfig: ServiceConfig | null = null; + let serviceConfigError: StatusObject | null = null; + if (txtRecord instanceof Error) { serviceConfigError = { code: Status.UNAVAILABLE, - details: 'Parsing service config failed', - metadata: new Metadata() + details: 'TXT query failed', + metadata: new Metadata(), }; + } else { + try { + serviceConfig = extractAndSelectServiceConfig( + txtRecord, + this.percentage + ); + } catch (err) { + serviceConfigError = { + code: Status.UNAVAILABLE, + details: 'Parsing service config failed', + metadata: new Metadata(), + }; + } } + this.listener.onSuccessfulResolution( + allAddresses, + serviceConfig, + serviceConfigError + ); + }, + err => { + this.pendingResultPromise = null; + this.listener.onError({ + code: Status.UNAVAILABLE, + details: 'Name resolution failed', + metadata: new Metadata(), + }); + this.listener.onError(err); } - this.listener.onSuccessfulResolution(allAddresses, serviceConfig, serviceConfigError); - }, (err) => { - this.pendingResultPromise = null; - this.listener.onError({ - code: Status.UNAVAILABLE, - details: 'Name resolution failed', - metadata: new Metadata() - }); - this.listener.onError(err); - }); + ); } } @@ -198,10 +222,13 @@ class DnsResolver implements Resolver { /** * Get the default authority for the given target. For IP targets, that is * the IP address. For DNS targets, it is the hostname. - * @param target + * @param target */ static getDefaultAuthority(target: string): string { - const ipMatch = IPv4_REGEX.exec(target) || IPv6_REGEX.exec(target) || IPv6_BRACKET_REGEX.exec(target); + const ipMatch = + IPV4_REGEX.exec(target) || + IPV6_REGEX.exec(target) || + IPV6_BRACKET_REGEX.exec(target); if (ipMatch) { return ipMatch[1]; } @@ -220,4 +247,4 @@ class DnsResolver implements Resolver { export function setup(): void { registerResolver('dns:', DnsResolver); registerDefaultResolver(DnsResolver); -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index a27782c2..b4c69f3d 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -15,10 +15,10 @@ * */ -import { ServiceError } from "./call"; -import { ServiceConfig } from "./service-config"; +import { ServiceError } from './call'; +import { ServiceConfig } from './service-config'; import * as resolver_dns from './resolver-dns'; -import { StatusObject } from "./call-stream"; +import { StatusObject } from './call-stream'; /** * A listener object passed to the resolver's constructor that provides name @@ -34,7 +34,11 @@ export interface ResolverListener { * @param serviceConfigError If non-`null`, indicates that the retrieved * service configuration was invalid */ - onSuccessfulResolution(addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null): void; + onSuccessfulResolution( + addressList: string[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ): void; /** * Called whenever a name resolution attempt fails. * @param error Describes how resolution failed @@ -56,36 +60,38 @@ export interface Resolver { updateResolution(): void; } - export interface ResolverConstructor { - new(target: string, listener: ResolverListener): Resolver; + new (target: string, listener: ResolverListener): Resolver; /** * Get the default authority for a target. This loosely corresponds to that * target's hostname. Throws an error if this resolver class cannot parse the * `target`. * @param target */ - getDefaultAuthority(target:string): string; + getDefaultAuthority(target: string): string; } -const registeredResolvers: {[prefix: string]: ResolverConstructor} = {}; +const registeredResolvers: { [prefix: string]: ResolverConstructor } = {}; let defaultResolver: ResolverConstructor | null = null; /** * Register a resolver class to handle target names prefixed with the `prefix` * string. This prefix should correspond to a URI scheme name listed in the * [gRPC Name Resolution document](https://github.com/grpc/grpc/blob/master/doc/naming.md) - * @param prefix - * @param resolverClass + * @param prefix + * @param resolverClass */ -export function registerResolver(prefix: string, resolverClass: ResolverConstructor) { +export function registerResolver( + prefix: string, + resolverClass: ResolverConstructor +) { registeredResolvers[prefix] = resolverClass; } /** * Register a default resolver to handle target names that do not start with * any registered prefix. - * @param resolverClass + * @param resolverClass */ export function registerDefaultResolver(resolverClass: ResolverConstructor) { defaultResolver = resolverClass; @@ -94,10 +100,13 @@ export function registerDefaultResolver(resolverClass: ResolverConstructor) { /** * Create a name resolver for the specified target, if possible. Throws an * error if no such name resolver can be created. - * @param target - * @param listener + * @param target + * @param listener */ -export function createResolver(target: string, listener: ResolverListener): Resolver { +export function createResolver( + target: string, + listener: ResolverListener +): Resolver { for (const prefix of Object.keys(registeredResolvers)) { if (target.startsWith(prefix)) { return new registeredResolvers[prefix](target, listener); @@ -112,7 +121,7 @@ export function createResolver(target: string, listener: ResolverListener): Reso /** * Get the default authority for the specified target, if possible. Throws an * error if no registered name resolver can parse that target string. - * @param target + * @param target */ export function getDefaultAuthority(target: string): string { for (const prefix of Object.keys(registerDefaultResolver)) { @@ -128,4 +137,4 @@ export function getDefaultAuthority(target: string): string { export function registerAll() { resolver_dns.setup(); -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 6d13f75f..12f1b3b7 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -15,18 +15,23 @@ * */ -import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered, createLoadBalancer } from "./load-balancer"; -import { ServiceConfig } from "./service-config"; -import { ConnectivityState } from "./channel"; -import { createResolver, Resolver } from "./resolver"; -import { ServiceError } from "./call"; -import { ChannelOptions } from "./channel-options"; -import { Picker, UnavailablePicker, QueuePicker } from "./picker"; -import { LoadBalancingConfig } from "./load-balancing-config"; -import { BackoffTimeout } from "./backoff-timeout"; -import { Status } from "./constants"; -import { StatusObject } from "./call-stream"; -import { Metadata } from "./metadata"; +import { + ChannelControlHelper, + LoadBalancer, + isLoadBalancerNameRegistered, + createLoadBalancer, +} from './load-balancer'; +import { ServiceConfig } from './service-config'; +import { ConnectivityState } from './channel'; +import { createResolver, Resolver } from './resolver'; +import { ServiceError } from './call'; +import { ChannelOptions } from './channel-options'; +import { Picker, UnavailablePicker, QueuePicker } from './picker'; +import { LoadBalancingConfig } from './load-balancing-config'; +import { BackoffTimeout } from './backoff-timeout'; +import { Status } from './constants'; +import { StatusObject } from './call-stream'; +import { Metadata } from './metadata'; const DEFAULT_LOAD_BALANCER_NAME = 'pick_first'; @@ -103,10 +108,18 @@ export class ResolvingLoadBalancer implements LoadBalancer { * In practice, that means using the "pick first" load balancer * implmentation */ - constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) { + constructor( + private target: string, + private channelControlHelper: ChannelControlHelper, + private defaultServiceConfig: ServiceConfig | null + ) { this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); this.innerResolver = createResolver(target, { - onSuccessfulResolution: (addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null) => { + onSuccessfulResolution: ( + addressList: string[], + serviceConfig: ServiceConfig | null, + serviceConfigError: ServiceError | null + ) => { let workingServiceConfig: ServiceConfig | null = null; /* This first group of conditionals implements the algorithm described * in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md @@ -127,7 +140,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { this.handleResolutionFailure(serviceConfigError); } else { // Step 4.ii.a - workingServiceConfig = this.defaultServiceConfig + workingServiceConfig = this.defaultServiceConfig; } } else { // Step 4.i @@ -141,7 +154,10 @@ export class ResolvingLoadBalancer implements LoadBalancer { } let loadBalancerName: string | null = null; let loadBalancingConfig: LoadBalancingConfig | null = null; - if (workingServiceConfig === null || workingServiceConfig.loadBalancingConfig.length === 0) { + if ( + workingServiceConfig === null || + workingServiceConfig.loadBalancingConfig.length === 0 + ) { loadBalancerName = DEFAULT_LOAD_BALANCER_NAME; } else { for (const lbConfig of workingServiceConfig.loadBalancingConfig) { @@ -163,39 +179,68 @@ export class ResolvingLoadBalancer implements LoadBalancer { // There were load balancing configs but none are supported. This counts as a resolution failure this.handleResolutionFailure({ code: Status.UNAVAILABLE, - details: 'All load balancer options in service config are not compatible', - metadata: new Metadata() + details: + 'All load balancer options in service config are not compatible', + metadata: new Metadata(), }); return; } } if (this.innerLoadBalancer === null) { - this.innerLoadBalancer = createLoadBalancer(loadBalancerName, this.innerChannelControlHelper)!; - this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + this.innerLoadBalancer = createLoadBalancer( + loadBalancerName, + this.innerChannelControlHelper + )!; + this.innerLoadBalancer.updateAddressList( + addressList, + loadBalancingConfig + ); } else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) { - this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + this.innerLoadBalancer.updateAddressList( + addressList, + loadBalancingConfig + ); } else { - if (this.pendingReplacementLoadBalancer === null || this.pendingReplacementLoadBalancer.getTypeName() !== loadBalancerName) { + if ( + this.pendingReplacementLoadBalancer === null || + this.pendingReplacementLoadBalancer.getTypeName() !== + loadBalancerName + ) { if (this.pendingReplacementLoadBalancer !== null) { this.pendingReplacementLoadBalancer.destroy(); } - this.pendingReplacementLoadBalancer = createLoadBalancer(loadBalancerName, this.replacementChannelControlHelper)!; + this.pendingReplacementLoadBalancer = createLoadBalancer( + loadBalancerName, + this.replacementChannelControlHelper + )!; } - this.pendingReplacementLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + this.pendingReplacementLoadBalancer.updateAddressList( + addressList, + loadBalancingConfig + ); } }, onError: (error: StatusObject) => { this.handleResolutionFailure(error); - } + }, }); this.innerChannelControlHelper = { - createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { - return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + createSubchannel: ( + subchannelAddress: string, + subchannelArgs: ChannelOptions + ) => { + return this.channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ); }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.innerBalancerState = connectivityState; - if (connectivityState !== ConnectivityState.READY && this.pendingReplacementLoadBalancer !== null) { + if ( + connectivityState !== ConnectivityState.READY && + this.pendingReplacementLoadBalancer !== null + ) { this.switchOverReplacementBalancer(); } else { this.updateState(connectivityState, picker); @@ -211,12 +256,18 @@ export class ResolvingLoadBalancer implements LoadBalancer { this.innerResolver.updateResolution(); } } - } - } + }, + }; this.replacementChannelControlHelper = { - createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { - return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + createSubchannel: ( + subchannelAddress: string, + subchannelArgs: ChannelOptions + ) => { + return this.channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ); }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.replacementBalancerState = connectivityState; @@ -233,7 +284,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { * updateResolution */ this.innerResolver.updateResolution(); } - } + }, }; this.backoffTimeout = new BackoffTimeout(() => { @@ -258,15 +309,23 @@ export class ResolvingLoadBalancer implements LoadBalancer { private switchOverReplacementBalancer() { this.innerLoadBalancer!.destroy(); this.innerLoadBalancer = this.pendingReplacementLoadBalancer!; - this.innerLoadBalancer.replaceChannelControlHelper(this.innerChannelControlHelper); + this.innerLoadBalancer.replaceChannelControlHelper( + this.innerChannelControlHelper + ); this.pendingReplacementLoadBalancer = null; this.innerBalancerState = this.replacementBalancerState; - this.updateState(this.replacementBalancerState, this.replacementBalancerPicker); + this.updateState( + this.replacementBalancerState, + this.replacementBalancerPicker + ); } private handleResolutionFailure(error: StatusObject) { if (this.innerLoadBalancer === null) { - this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(error)); + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker(error) + ); } this.backoffTimeout.runOnce(); } @@ -276,10 +335,16 @@ export class ResolvingLoadBalancer implements LoadBalancer { if (this.innerLoadBalancer !== null) { this.innerLoadBalancer.exitIdle(); } - this.channelControlHelper.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); + this.channelControlHelper.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); } - updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null) { + updateAddressList( + addressList: string[], + lbConfig: LoadBalancingConfig | null + ) { throw new Error('updateAddressList not supported on ResolvingLoadBalancer'); } @@ -312,4 +377,4 @@ export class ResolvingLoadBalancer implements LoadBalancer { replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) { this.channelControlHelper = channelControlHelper; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index b4f1f647..d9b60812 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -22,8 +22,11 @@ * specific object type if the input has the right structure, and throws an * error otherwise. */ +/* The any type is purposely used here. All functions validate their input at + * runtime */ +/* tslint:disable:no-any */ + import * as lbconfig from './load-balancing-config'; -import { isString, isArray, isBoolean, isNumber } from 'util'; import * as os from 'os'; export interface MethodConfigName { @@ -41,11 +44,10 @@ export interface MethodConfig { export interface ServiceConfig { loadBalancingPolicy?: string; - loadBalancingConfig: lbconfig.LoadBalancingConfig[] + loadBalancingConfig: lbconfig.LoadBalancingConfig[]; methodConfig: MethodConfig[]; } - export interface ServiceConfigCanaryConfig { clientLanguage?: string[]; percentage?: number; @@ -66,14 +68,14 @@ const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/; const CLIENT_LANGUAGE_STRING = 'node'; function validateName(obj: any): MethodConfigName { - if (!('service' in obj) || !isString(obj.service)) { + if (!('service' in obj) || typeof obj.service !== 'string') { throw new Error('Invalid method config name: invalid service'); } const result: MethodConfigName = { - service: obj.service + service: obj.service, }; if ('method' in obj) { - if (isString(obj.method)) { + if (typeof obj.method === 'string') { result.method = obj.method; } else { throw new Error('Invalid method config name: invalid method'); @@ -84,34 +86,37 @@ function validateName(obj: any): MethodConfigName { function validateMethodConfig(obj: any): MethodConfig { const result: MethodConfig = { - name: [] + name: [], }; - if (!('name' in obj) || !isArray(obj.name)) { + if (!('name' in obj) || !Array.isArray(obj.name)) { throw new Error('Invalid method config: invalid name array'); } for (const name of obj.name) { result.name.push(validateName(name)); } if ('waitForReady' in obj) { - if (!isBoolean(obj.waitForReady)) { + if (typeof obj.waitForReady !== 'boolean') { throw new Error('Invalid method config: invalid waitForReady'); } result.waitForReady = obj.waitForReady; } if ('timeout' in obj) { - if (!isString(obj.timeout) || !TIMEOUT_REGEX.test(obj.timeout)) { + if ( + !(typeof obj.timeout === 'string') || + !TIMEOUT_REGEX.test(obj.timeout) + ) { throw new Error('Invalid method config: invalid timeout'); } result.timeout = obj.timeout; } if ('maxRequestBytes' in obj) { - if (!isNumber(obj.maxRequestBytes)) { + if (typeof obj.maxRequestBytes !== 'number') { throw new Error('Invalid method config: invalid maxRequestBytes'); } result.maxRequestBytes = obj.maxRequestBytes; } if ('maxResponseBytes' in obj) { - if (!isNumber(obj.maxResponseBytes)) { + if (typeof obj.maxResponseBytes !== 'number') { throw new Error('Invalid method config: invalid maxRequestBytes'); } result.maxResponseBytes = obj.maxResponseBytes; @@ -122,17 +127,17 @@ function validateMethodConfig(obj: any): MethodConfig { function validateServiceConfig(obj: any): ServiceConfig { const result: ServiceConfig = { loadBalancingConfig: [], - methodConfig: [] + methodConfig: [], }; if ('loadBalancingPolicy' in obj) { - if (isString(obj.loadBalancingPolicy)) { + if (typeof obj.loadBalancingPolicy === 'string') { result.loadBalancingPolicy = obj.loadBalancingPolicy; } else { throw new Error('Invalid service config: invalid loadBalancingPolicy'); } } if ('loadBalancingConfig' in obj) { - if (isArray(obj.loadBalancingConfig)) { + if (Array.isArray(obj.loadBalancingConfig)) { for (const config of obj.loadBalancingConfig) { result.loadBalancingConfig.push(lbconfig.validateConfig(config)); } @@ -141,7 +146,7 @@ function validateServiceConfig(obj: any): ServiceConfig { } } if ('methodConfig' in obj) { - if (isArray(obj.methodConfig)) { + if (Array.isArray(obj.methodConfig)) { for (const methodConfig of obj.methodConfig) { result.methodConfig.push(validateMethodConfig(methodConfig)); } @@ -152,8 +157,15 @@ function validateServiceConfig(obj: any): ServiceConfig { for (const methodConfig of result.methodConfig) { for (const name of methodConfig.name) { for (const seenName of seenMethodNames) { - if (name.service === seenName.service && name.method === seenName.method) { - throw new Error(`Invalid service config: duplicate name ${name.service}/${name.method}`); + if ( + name.service === seenName.service && + name.method === seenName.method + ) { + throw new Error( + `Invalid service config: duplicate name ${name.service}/${ + name.method + }` + ); } } seenMethodNames.push(name); @@ -167,16 +179,18 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig { throw new Error('Invalid service config choice: missing service config'); } const result: ServiceConfigCanaryConfig = { - serviceConfig: validateServiceConfig(obj.serviceConfig) - } + serviceConfig: validateServiceConfig(obj.serviceConfig), + }; if ('clientLanguage' in obj) { - if (isArray(obj.clientLanguage)) { + if (Array.isArray(obj.clientLanguage)) { result.clientLanguage = []; for (const lang of obj.clientLanguage) { - if (isString(lang)) { + if (typeof lang === 'string') { result.clientLanguage.push(lang); } else { - throw new Error('Invalid service config choice: invalid clientLanguage'); + throw new Error( + 'Invalid service config choice: invalid clientLanguage' + ); } } } else { @@ -184,13 +198,15 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig { } } if ('clientHostname' in obj) { - if (isArray(obj.clientHostname)) { + if (Array.isArray(obj.clientHostname)) { result.clientHostname = []; for (const lang of obj.clientHostname) { - if (isString(lang)) { + if (typeof lang === 'string') { result.clientHostname.push(lang); } else { - throw new Error('Invalid service config choice: invalid clientHostname'); + throw new Error( + 'Invalid service config choice: invalid clientHostname' + ); } } } else { @@ -198,34 +214,51 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig { } } if ('percentage' in obj) { - if (isNumber(obj.percentage) && 0 <= obj.percentage && obj.percentage <= 100) { + if ( + typeof obj.percentage === 'number' && + 0 <= obj.percentage && + obj.percentage <= 100 + ) { result.percentage = obj.percentage; } else { throw new Error('Invalid service config choice: invalid percentage'); } } // Validate that no unexpected fields are present - const allowedFields = ['clientLanguage', 'percentage', 'clientHostname', 'serviceConfig']; + const allowedFields = [ + 'clientLanguage', + 'percentage', + 'clientHostname', + 'serviceConfig', + ]; for (const field in obj) { if (!allowedFields.includes(field)) { - throw new Error(`Invalid service config choice: unexpected field ${field}`); + throw new Error( + `Invalid service config choice: unexpected field ${field}` + ); } } return result; } -function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceConfig { - if (!isArray(obj)) { +function validateAndSelectCanaryConfig( + obj: any, + percentage: number +): ServiceConfig { + if (!Array.isArray(obj)) { throw new Error('Invalid service config list'); } for (const config of obj) { const validatedConfig = validateCanaryConfig(config); /* For each field, we check if it is present, then only discard the * config if the field value does not match the current client */ - if (isNumber(validatedConfig.percentage) && percentage > validatedConfig.percentage) { + if ( + typeof validatedConfig.percentage === 'number' && + percentage > validatedConfig.percentage + ) { continue; } - if (isArray(validatedConfig.clientHostname)) { + if (Array.isArray(validatedConfig.clientHostname)) { let hostnameMatched = false; for (const hostname of validatedConfig.clientHostname) { if (hostname === os.hostname()) { @@ -236,7 +269,7 @@ function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceCon continue; } } - if (isArray(validatedConfig.clientLanguage)) { + if (Array.isArray(validatedConfig.clientLanguage)) { let languageMatched = false; for (const language of validatedConfig.clientLanguage) { if (language === CLIENT_LANGUAGE_STRING) { @@ -261,7 +294,10 @@ function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceCon * @return The service configuration to use, given the percentage value, or null if the service config * data has a valid format but none of the options match the current client. */ -export function extractAndSelectServiceConfig(txtRecord: string[][], percentage: number): ServiceConfig | null { +export function extractAndSelectServiceConfig( + txtRecord: string[][], + percentage: number +): ServiceConfig | null { for (const record of txtRecord) { if (record.length > 0 && record[0].startsWith('grpc_config=')) { /* Treat the list of strings in this record as a single string and remove @@ -272,4 +308,4 @@ export function extractAndSelectServiceConfig(txtRecord: string[][], percentage: } } return null; -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index bbefc16c..1a5015b6 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -15,9 +15,9 @@ * */ -import { ChannelOptions, channelOptionsEqual } from "./channel-options"; -import { Subchannel } from "./subchannel"; -import { ChannelCredentials } from "./channel-credentials"; +import { ChannelOptions, channelOptionsEqual } from './channel-options'; +import { Subchannel } from './subchannel'; +import { ChannelCredentials } from './channel-credentials'; // 10 seconds in milliseconds. This value is arbitrary. /** @@ -27,7 +27,15 @@ import { ChannelCredentials } from "./channel-credentials"; const REF_CHECK_INTERVAL = 10_000; export class SubchannelPool { - private pool: {[channelTarget: string]: {[subchannelTarget: string]: {channelArguments: ChannelOptions, channelCredentials: ChannelCredentials, subchannel: Subchannel}[]}} = Object.create(null); + private pool: { + [channelTarget: string]: { + [subchannelTarget: string]: Array<{ + channelArguments: ChannelOptions; + channelCredentials: ChannelCredentials; + subchannel: Subchannel; + }>; + }; + } = Object.create(null); /** * A pool of subchannels use for making connections. Subchannels with the @@ -38,13 +46,24 @@ export class SubchannelPool { constructor(private global: boolean) { if (global) { setInterval(() => { + /* These objects are created with Object.create(null), so they do not + * have a prototype, which means that for (... in ...) loops over them + * do not need to be filtered */ + // tslint:disable-next-line:forin for (const channelTarget in this.pool) { + // tslint:disable-next-line:forin for (const subchannelTarget in this.pool[channelTarget]) { - const subchannelObjArray = this.pool[channelTarget][subchannelTarget]; + const subchannelObjArray = this.pool[channelTarget][ + subchannelTarget + ]; /* For each subchannel in the pool, try to unref it if it has * exactly one ref (which is the ref from the pool itself). If that * does happen, remove the subchannel from the pool */ - this.pool[channelTarget][subchannelTarget] = subchannelObjArray.filter((value) => !value.subchannel.unrefIfOneRef()); + this.pool[channelTarget][ + subchannelTarget + ] = subchannelObjArray.filter( + value => !value.subchannel.unrefIfOneRef() + ); } } /* Currently we do not delete keys with empty values. If that results @@ -57,31 +76,51 @@ export class SubchannelPool { /** * Get a subchannel if one already exists with exactly matching parameters. * Otherwise, create and save a subchannel with those parameters. - * @param channelTarget - * @param subchannelTarget - * @param channelArguments - * @param channelCredentials + * @param channelTarget + * @param subchannelTarget + * @param channelArguments + * @param channelCredentials */ - getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel { + getOrCreateSubchannel( + channelTarget: string, + subchannelTarget: string, + channelArguments: ChannelOptions, + channelCredentials: ChannelCredentials + ): Subchannel { if (channelTarget in this.pool) { - if (subchannelTarget in this.pool[channelTarget]){ + if (subchannelTarget in this.pool[channelTarget]) { const subchannelObjArray = this.pool[channelTarget][subchannelTarget]; for (const subchannelObj of subchannelObjArray) { - if (channelOptionsEqual(channelArguments, subchannelObj.channelArguments) && channelCredentials._equals(subchannelObj.channelCredentials)) { + if ( + channelOptionsEqual( + channelArguments, + subchannelObj.channelArguments + ) && + channelCredentials._equals(subchannelObj.channelCredentials) + ) { return subchannelObj.subchannel; } } } } // If we get here, no matching subchannel was found - const subchannel = new Subchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials); + const subchannel = new Subchannel( + channelTarget, + subchannelTarget, + channelArguments, + channelCredentials + ); if (!(channelTarget in this.pool)) { this.pool[channelTarget] = Object.create(null); } if (!(subchannelTarget in this.pool[channelTarget])) { this.pool[channelTarget][subchannelTarget] = []; } - this.pool[channelTarget][subchannelTarget].push({channelArguments, channelCredentials, subchannel}); + this.pool[channelTarget][subchannelTarget].push({ + channelArguments, + channelCredentials, + subchannel, + }); if (this.global) { subchannel.ref(); } @@ -93,7 +132,7 @@ const globalSubchannelPool = new SubchannelPool(true); /** * Get either the global subchannel pool, or a new subchannel pool. - * @param global + * @param global */ export function getSubchannelPool(global: boolean): SubchannelPool { if (global) { @@ -101,4 +140,4 @@ export function getSubchannelPool(global: boolean): SubchannelPool { } else { return new SubchannelPool(false); } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 5dae2c22..2683643b 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -38,7 +38,11 @@ const BACKOFF_JITTER = 0.2; const KEEPALIVE_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; -export type ConnectivityStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => void; +export type ConnectivityStateListener = ( + subchannel: Subchannel, + previousState: ConnectivityState, + newState: ConnectivityState +) => void; const { HTTP2_HEADER_AUTHORITY, @@ -51,8 +55,8 @@ const { /** * Get a number uniformly at random in the range [min, max) - * @param min - * @param max + * @param min + * @param max */ function uniformRandom(min: number, max: number) { return Math.random() * (max - min) + min; @@ -72,7 +76,7 @@ export class Subchannel { * Indicates that the subchannel should transition from TRANSIENT_FAILURE to * CONNECTING instead of IDLE when the backoff timeout ends. */ - private continueConnecting: boolean = false; + private continueConnecting = false; /** * A list of listener functions that will be called whenever the connectivity * state changes. Will be modified by `addConnectivityStateListener` and @@ -107,11 +111,11 @@ export class Subchannel { /** * Tracks calls with references to this subchannel */ - private callRefcount: number = 0; + private callRefcount = 0; /** * Tracks channels and subchannel pools with references to this subchannel */ - private refcount: number = 0; + private refcount = 0; /** * A class representing a connection to a single backend. @@ -123,40 +127,45 @@ export class Subchannel { * @param credentials The channel credentials used to establish this * connection */ - constructor(private channelTarget: string, + constructor( + private channelTarget: string, private subchannelAddress: string, private options: ChannelOptions, - private credentials: ChannelCredentials) { - // Build user-agent string. - this.userAgent = [ - options['grpc.primary_user_agent'], - `grpc-node-js/${clientVersion}`, - options['grpc.secondary_user_agent'], - ] - .filter(e => e) - .join(' '); // remove falsey values first + private credentials: ChannelCredentials + ) { + // Build user-agent string. + this.userAgent = [ + options['grpc.primary_user_agent'], + `grpc-node-js/${clientVersion}`, + options['grpc.secondary_user_agent'], + ] + .filter(e => e) + .join(' '); // remove falsey values first - if ('grpc.keepalive_time_ms' in options) { - this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; - } - if ('grpc.keepalive_timeout_ms' in options) { - this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; - } - this.keepaliveIntervalId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveIntervalId); - this.keepaliveTimeoutId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveTimeoutId); - this.backoffTimeout = new BackoffTimeout(() => { - - if (this.continueConnecting) { - this.transitionToState([ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING], - ConnectivityState.CONNECTING); - } else { - this.transitionToState([ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING], - ConnectivityState.IDLE); - } - }); + if ('grpc.keepalive_time_ms' in options) { + this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; } + if ('grpc.keepalive_timeout_ms' in options) { + this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } + this.keepaliveIntervalId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveIntervalId); + this.keepaliveTimeoutId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveTimeoutId); + this.backoffTimeout = new BackoffTimeout(() => { + if (this.continueConnecting) { + this.transitionToState( + [ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING], + ConnectivityState.CONNECTING + ); + } else { + this.transitionToState( + [ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING], + ConnectivityState.IDLE + ); + } + }); + } /** * Start a backoff timer with the current nextBackoff timeout @@ -195,7 +204,7 @@ export class Subchannel { private startConnectingInternal() { const connectionOptions: http2.SecureClientSessionOptions = - this.credentials._getConnectionOptions() || {}; + this.credentials._getConnectionOptions() || {}; let addressScheme = 'http://'; if ('secureContext' in connectionOptions) { addressScheme = 'https://'; @@ -217,20 +226,30 @@ export class Subchannel { connectionOptions.servername = this.channelTarget; } } - this.session = http2.connect(addressScheme + this.subchannelAddress, connectionOptions); + this.session = http2.connect( + addressScheme + this.subchannelAddress, + connectionOptions + ); this.session.unref(); this.session.once('connect', () => { - this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY); + this.transitionToState( + [ConnectivityState.CONNECTING], + ConnectivityState.READY + ); }); this.session.once('close', () => { - this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE); + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.TRANSIENT_FAILURE + ); }); this.session.once('goaway', () => { - this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE); + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.IDLE + ); }); - this.session.once('error', (error) => { + this.session.once('error', error => { /* Do nothing here. Any error should also trigger a close event, which is * where we want to handle that. */ }); @@ -250,7 +269,7 @@ export class Subchannel { if (oldStates.indexOf(this.connectivityState) === -1) { return false; } - let previousState = this.connectivityState; + const previousState = this.connectivityState; this.connectivityState = newState; switch (newState) { case ConnectivityState.READY: @@ -272,6 +291,9 @@ export class Subchannel { this.stopBackoff(); this.session = null; this.stopKeepalivePings(); + break; + default: + throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); } /* We use a shallow copy of the stateListeners array in case a listener * is removed during this iteration */ @@ -289,10 +311,14 @@ export class Subchannel { /* If no calls, channels, or subchannel pools have any more references to * this subchannel, we can be sure it will never be used again. */ if (this.callRefcount === 0 && this.refcount === 0) { - this.transitionToState([ConnectivityState.CONNECTING, - ConnectivityState.IDLE, - ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE); + this.transitionToState( + [ + ConnectivityState.CONNECTING, + ConnectivityState.IDLE, + ConnectivityState.READY, + ], + ConnectivityState.TRANSIENT_FAILURE + ); } } @@ -338,8 +364,8 @@ export class Subchannel { * Start a stream on the current session with the given `metadata` as headers * and then attach it to the `callStream`. Must only be called if the * subchannel's current connectivity state is READY. - * @param metadata - * @param callStream + * @param metadata + * @param callStream */ startCallStream(metadata: Metadata, callStream: Http2CallStream) { const headers = metadata.toHttp2Headers(); @@ -368,7 +394,12 @@ export class Subchannel { * because the state is not currently IDLE, check if it is * TRANSIENT_FAILURE, and if so indicate that it should go back to * connecting after the backoff timer ends. Otherwise do nothing */ - if (!this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING)) { + if ( + !this.transitionToState( + [ConnectivityState.IDLE], + ConnectivityState.CONNECTING + ) + ) { if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { this.continueConnecting = true; } @@ -385,7 +416,7 @@ export class Subchannel { /** * Add a listener function to be called whenever the subchannel's * connectivity state changes. - * @param listener + * @param listener */ addConnectivityStateListener(listener: ConnectivityStateListener) { this.stateListeners.push(listener); @@ -408,6 +439,9 @@ export class Subchannel { */ resetBackoff() { this.backoffTimeout.reset(); - this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING); + this.transitionToState( + [ConnectivityState.TRANSIENT_FAILURE], + ConnectivityState.CONNECTING + ); } -} \ No newline at end of file +} diff --git a/packages/grpc-js/test/test-call-stream.ts b/packages/grpc-js/test/test-call-stream.ts index 795f596f..3b4527f5 100644 --- a/packages/grpc-js/test/test-call-stream.ts +++ b/packages/grpc-js/test/test-call-stream.ts @@ -273,7 +273,9 @@ describe('CallStream', () => { frameLengths: range(0, 20).map(() => 1), }, ].forEach((testCase: { description: string; frameLengths: number[] }) => { - it(`should handle a short message where ${testCase.description}`, done => { + it(`should handle a short message where ${ + testCase.description + }`, done => { const callStream = new Http2CallStream( 'foo', {} as ChannelImplementation,