diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 1b7fda5b..9afe1707 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -15,6 +15,12 @@ import {Metadata, MetadataObject} from './metadata'; const IDLE_TIMEOUT_MS = 300000; +const MIN_CONNECT_TIMEOUT_MS = 20000; +const INITIAL_BACKOFF_MS = 1000; +const BACKOFF_MULTIPLIER = 1.6; +const MAX_BACKOFF_MS = 120000; +const BACKOFF_JITTER = 0.2; + const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, @@ -37,6 +43,10 @@ export enum ConnectivityState { SHUTDOWN } +function uniformRandom(min:number, max: number) { + return Math.random() * (max - min) + min; +} + // todo: maybe we want an interface for load balancing, but no implementation // for anything complicated @@ -62,49 +72,97 @@ export interface Channel extends EventEmitter { export class Http2Channel extends EventEmitter implements Channel { private connectivityState: ConnectivityState = ConnectivityState.IDLE; - private idleTimerId: NodeJS.Timer|null = null; /* For now, we have up to one subchannel, which will exist as long as we are * connecting or trying to connect */ private subChannel: http2.ClientHttp2Session|null; private filterStackFactory: FilterStackFactory; - private transitionToState(newState: ConnectivityState): void { - if (newState !== this.connectivityState) { + private idleTimerId: NodeJS.Timer|null = null; + + private subChannelConnectCallback: ()=>void = () => {}; + private subChannelCloseCallback: ()=>void = () => {}; + + private backoffTimerId: NodeJS.Timer|null = null; + private currentBackoff: number = INITIAL_BACKOFF_MS; + private currentBackoffDeadline: Date; + + private handleStateChange(oldState: ConnectivityState, newState: ConnectivityState): void { + let now: Date = new Date(); + switch(newState) { + case ConnectivityState.CONNECTING: + if (oldState === ConnectivityState.IDLE) { + this.currentBackoff = INITIAL_BACKOFF_MS; + this.currentBackoffDeadline = new Date(now.getTime() + INITIAL_BACKOFF_MS); + } else if (oldState == ConnectivityState.TRANSIENT_FAILURE) { + this.currentBackoff = Math.min(this.currentBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS); + let jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff; + this.currentBackoffDeadline = new Date(now.getTime() + this.currentBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude)); + } + this.startConnecting(); + break; + case ConnectivityState.READY: + this.emit('connect'); + break; + case ConnectivityState.TRANSIENT_FAILURE: + this.subChannel = null; + this.backoffTimerId = setTimeout(() => { + this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING); + }, this.currentBackoffDeadline.getTime() - now.getTime()); + break; + case ConnectivityState.IDLE: + case ConnectivityState.SHUTDOWN: + if (this.subChannel !== null) { + this.subChannel.shutdown({graceful: true}); + this.subChannel.removeListener('connect', this.subChannelConnectCallback); + this.subChannel.removeListener('close', this.subChannelCloseCallback); + this.subChannel = null; + } + break; + } + } + + // Transition from any of a set of oldStates to a specific newState + private transitionToState(oldStates: [ConnectivityState], newState: ConnectivityState): void { + if (oldStates.indexOf(this.connectivityState) > -1) { + let oldState: ConnectivityState = this.connectivityState; this.connectivityState = newState; + this.handleStateChange(oldState, newState); this.emit('connectivityStateChanged', newState); } } private startConnecting(): void { - this.transitionToState(ConnectivityState.CONNECTING); + let subChannel: http2.ClientHttp2Session; let secureContext = this.credentials.getSecureContext(); if (secureContext === null) { - this.subChannel = http2.connect(this.address); + subChannel = http2.connect(this.address); } else { - this.subChannel = http2.connect(this.address, {secureContext}); - } - this.subChannel.once('connect', () => { - this.transitionToState(ConnectivityState.READY); - }); - this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => { - this.goIdle(); - }); - /* TODO(murgatroid99): add connection-level error handling with exponential - * reconnection backoff */ - } - - private goIdle(): void { - if (this.subChannel !== null) { - this.subChannel.shutdown({graceful: true}, () => undefined); - this.subChannel = null; - } - this.transitionToState(ConnectivityState.IDLE); - } - - private kickConnectivityState(): void { - if (this.connectivityState === ConnectivityState.IDLE) { - this.startConnecting(); + subChannel = http2.connect(this.address, {secureContext}); } + this.subChannel = subChannel; + let now = new Date(); + let connectionTimeout: number = Math.max( + this.currentBackoffDeadline.getTime() - now.getTime(), + MIN_CONNECT_TIMEOUT_MS); + let connectionTimerId: NodeJS.Timer = setTimeout(() => { + // This should trigger the 'close' event, which will send us back to TRANSIENT_FAILURE + subChannel.shutdown(); + }, connectionTimeout); + this.subChannelConnectCallback = () => { + // Connection succeeded + clearTimeout(connectionTimerId); + this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY); + }; + subChannel.once('connect', this.subChannelConnectCallback); + this.subChannelCloseCallback = () => { + // Connection failed + clearTimeout(connectionTimerId); + /* TODO(murgatroid99): verify that this works for CONNECTING->TRANSITIVE_FAILURE + * see nodejs/node#16645 */ + this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.TRANSIENT_FAILURE); + }; + subChannel.once('close', this.subChannelCloseCallback); } constructor( @@ -121,6 +179,7 @@ export class Http2Channel extends EventEmitter implements Channel { new CompressionFilterFactory(this), new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this) ]); + this.currentBackoffDeadline = new Date(); } private startHttp2Stream( @@ -158,7 +217,7 @@ export class Http2Channel extends EventEmitter implements Channel { } createStream(methodName: string, metadata: Metadata, options: CallOptions): - CallStream { + CallStream { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } @@ -174,15 +233,11 @@ export class Http2Channel extends EventEmitter implements Channel { } connect(callback: () => void): void { - this.kickConnectivityState(); + this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); if (this.connectivityState === ConnectivityState.READY) { setImmediate(callback); } else { - this.once('connectivityStateChanged', (newState) => { - if (newState === ConnectivityState.READY) { - callback(); - } - }); + this.once('connect', callback); } } @@ -194,7 +249,10 @@ export class Http2Channel extends EventEmitter implements Channel { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } - this.transitionToState(ConnectivityState.SHUTDOWN); + this.transitionToState([ConnectivityState.CONNECTING, + ConnectivityState.READY, + ConnectivityState.TRANSIENT_FAILURE, + ConnectivityState.IDLE], ConnectivityState.SHUTDOWN); if (this.subChannel !== null) { this.subChannel.shutdown({graceful: true}); }