Fix channel state code and add backoff code

This commit is contained in:
murgatroid99 2017-11-01 09:44:30 -07:00
parent b37aaab454
commit 9fcee91822

View File

@ -15,6 +15,12 @@ import {Metadata, MetadataObject} from './metadata';
const IDLE_TIMEOUT_MS = 300000;
const MIN_CONNECT_TIMEOUT_MS = 20000;
const INITIAL_BACKOFF_MS = 1000;
const BACKOFF_MULTIPLIER = 1.6;
const MAX_BACKOFF_MS = 120000;
const BACKOFF_JITTER = 0.2;
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
@ -37,6 +43,10 @@ export enum ConnectivityState {
SHUTDOWN
}
function uniformRandom(min:number, max: number) {
return Math.random() * (max - min) + min;
}
// todo: maybe we want an interface for load balancing, but no implementation
// for anything complicated
@ -62,49 +72,97 @@ export interface Channel extends EventEmitter {
export class Http2Channel extends EventEmitter implements Channel {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private idleTimerId: NodeJS.Timer|null = null;
/* For now, we have up to one subchannel, which will exist as long as we are
* connecting or trying to connect */
private subChannel: http2.ClientHttp2Session|null;
private filterStackFactory: FilterStackFactory;
private transitionToState(newState: ConnectivityState): void {
if (newState !== this.connectivityState) {
private idleTimerId: NodeJS.Timer|null = null;
private subChannelConnectCallback: ()=>void = () => {};
private subChannelCloseCallback: ()=>void = () => {};
private backoffTimerId: NodeJS.Timer|null = null;
private currentBackoff: number = INITIAL_BACKOFF_MS;
private currentBackoffDeadline: Date;
private handleStateChange(oldState: ConnectivityState, newState: ConnectivityState): void {
let now: Date = new Date();
switch(newState) {
case ConnectivityState.CONNECTING:
if (oldState === ConnectivityState.IDLE) {
this.currentBackoff = INITIAL_BACKOFF_MS;
this.currentBackoffDeadline = new Date(now.getTime() + INITIAL_BACKOFF_MS);
} else if (oldState == ConnectivityState.TRANSIENT_FAILURE) {
this.currentBackoff = Math.min(this.currentBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS);
let jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff;
this.currentBackoffDeadline = new Date(now.getTime() + this.currentBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude));
}
this.startConnecting();
break;
case ConnectivityState.READY:
this.emit('connect');
break;
case ConnectivityState.TRANSIENT_FAILURE:
this.subChannel = null;
this.backoffTimerId = setTimeout(() => {
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING);
}, this.currentBackoffDeadline.getTime() - now.getTime());
break;
case ConnectivityState.IDLE:
case ConnectivityState.SHUTDOWN:
if (this.subChannel !== null) {
this.subChannel.shutdown({graceful: true});
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
this.subChannel = null;
}
break;
}
}
// Transition from any of a set of oldStates to a specific newState
private transitionToState(oldStates: [ConnectivityState], newState: ConnectivityState): void {
if (oldStates.indexOf(this.connectivityState) > -1) {
let oldState: ConnectivityState = this.connectivityState;
this.connectivityState = newState;
this.handleStateChange(oldState, newState);
this.emit('connectivityStateChanged', newState);
}
}
private startConnecting(): void {
this.transitionToState(ConnectivityState.CONNECTING);
let subChannel: http2.ClientHttp2Session;
let secureContext = this.credentials.getSecureContext();
if (secureContext === null) {
this.subChannel = http2.connect(this.address);
subChannel = http2.connect(this.address);
} else {
this.subChannel = http2.connect(this.address, {secureContext});
}
this.subChannel.once('connect', () => {
this.transitionToState(ConnectivityState.READY);
});
this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => {
this.goIdle();
});
/* TODO(murgatroid99): add connection-level error handling with exponential
* reconnection backoff */
}
private goIdle(): void {
if (this.subChannel !== null) {
this.subChannel.shutdown({graceful: true}, () => undefined);
this.subChannel = null;
}
this.transitionToState(ConnectivityState.IDLE);
}
private kickConnectivityState(): void {
if (this.connectivityState === ConnectivityState.IDLE) {
this.startConnecting();
subChannel = http2.connect(this.address, {secureContext});
}
this.subChannel = subChannel;
let now = new Date();
let connectionTimeout: number = Math.max(
this.currentBackoffDeadline.getTime() - now.getTime(),
MIN_CONNECT_TIMEOUT_MS);
let connectionTimerId: NodeJS.Timer = setTimeout(() => {
// This should trigger the 'close' event, which will send us back to TRANSIENT_FAILURE
subChannel.shutdown();
}, connectionTimeout);
this.subChannelConnectCallback = () => {
// Connection succeeded
clearTimeout(connectionTimerId);
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY);
};
subChannel.once('connect', this.subChannelConnectCallback);
this.subChannelCloseCallback = () => {
// Connection failed
clearTimeout(connectionTimerId);
/* TODO(murgatroid99): verify that this works for CONNECTING->TRANSITIVE_FAILURE
* see nodejs/node#16645 */
this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
};
subChannel.once('close', this.subChannelCloseCallback);
}
constructor(
@ -121,6 +179,7 @@ export class Http2Channel extends EventEmitter implements Channel {
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this)
]);
this.currentBackoffDeadline = new Date();
}
private startHttp2Stream(
@ -158,7 +217,7 @@ export class Http2Channel extends EventEmitter implements Channel {
}
createStream(methodName: string, metadata: Metadata, options: CallOptions):
CallStream {
CallStream {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
@ -174,15 +233,11 @@ export class Http2Channel extends EventEmitter implements Channel {
}
connect(callback: () => void): void {
this.kickConnectivityState();
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(callback);
} else {
this.once('connectivityStateChanged', (newState) => {
if (newState === ConnectivityState.READY) {
callback();
}
});
this.once('connect', callback);
}
}
@ -194,7 +249,10 @@ export class Http2Channel extends EventEmitter implements Channel {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
this.transitionToState(ConnectivityState.SHUTDOWN);
this.transitionToState([ConnectivityState.CONNECTING,
ConnectivityState.READY,
ConnectivityState.TRANSIENT_FAILURE,
ConnectivityState.IDLE], ConnectivityState.SHUTDOWN);
if (this.subChannel !== null) {
this.subChannel.shutdown({graceful: true});
}