From 500642b5ac50979a20276fc2fdf65d912d6d64f4 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 3 Aug 2018 13:43:23 -0700 Subject: [PATCH] Create separate subchannel class, fix default keepalive time value --- packages/grpc-js-core/src/channel.ts | 103 ++---------------- packages/grpc-js-core/src/subchannel.ts | 134 ++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 94 deletions(-) create mode 100644 packages/grpc-js-core/src/subchannel.ts diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 5fc3b916..19ea2080 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -13,6 +13,7 @@ import {DeadlineFilterFactory} from './deadline-filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata, MetadataObject} from './metadata'; import {MetadataStatusFilterFactory} from './metadata-status-filter'; +import { Http2SubChannel } from './subchannel'; const {version: clientVersion} = require('../../package'); @@ -24,9 +25,6 @@ const BACKOFF_MULTIPLIER = 1.6; const MAX_BACKOFF_MS = 120000; const BACKOFF_JITTER = 0.2; -const KEEPALIVE_TIME_MS = 1 << 31; -const KEEPALIVE_TIMEOUT_MS = 20000; - const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, @@ -45,6 +43,8 @@ export interface ChannelOptions { 'grpc.primary_user_agent': string; 'grpc.secondary_user_agent': string; 'grpc.default_authority': string; + 'grpc.keealive_time_ms': number; + 'grpc.keepalive_timeout_ms': number; [key: string]: string|number; } @@ -85,15 +85,6 @@ export interface Channel extends EventEmitter { /* tslint:enable:no-any */ } -/* This should be a real subchannel class that contains a ClientHttp2Session, - * but for now this serves its purpose */ -type Http2SubChannel = http2.ClientHttp2Session&{ - /* Count the number of currently active streams associated with the session. - * The purpose of this is to keep the session reffed if and only if there - * is at least one active stream */ - streamCount?: number; -}; - export class Http2Channel extends EventEmitter implements Channel { private readonly userAgent: string; private readonly target: url.URL; @@ -113,12 +104,6 @@ export class Http2Channel extends EventEmitter implements Channel { private currentBackoff: number = INITIAL_BACKOFF_MS; private currentBackoffDeadline: Date; - private keepaliveTimeMs: number = KEEPALIVE_TIME_MS; - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; - private keepaliveIntervalId: NodeJS.Timer; - private keepaliveTimeoutId: NodeJS.Timer; - - private handleStateChange( oldState: ConnectivityState, newState: ConnectivityState): void { const now: Date = new Date(); @@ -143,8 +128,6 @@ export class Http2Channel extends EventEmitter implements Channel { break; case ConnectivityState.TRANSIENT_FAILURE: this.subChannel = null; - /* Stop keepalive pings when the subchannel disconnects */ - this.stopKeepalivePings(); this.backoffTimerId = setTimeout(() => { this.transitionToState( [ConnectivityState.TRANSIENT_FAILURE], @@ -161,7 +144,6 @@ export class Http2Channel extends EventEmitter implements Channel { this.subChannel = null; this.emit('shutdown'); clearTimeout(this.backoffTimerId); - this.stopKeepalivePings(); } break; default: @@ -181,14 +163,10 @@ export class Http2Channel extends EventEmitter implements Channel { } private startConnecting(): void { - let subChannel: Http2SubChannel; const secureContext = this.credentials.getSecureContext(); - if (secureContext === null) { - subChannel = http2.connect(this.target); - } else { - const connectionOptions: http2.SecureClientSessionOptions = { - secureContext, - }; + let connectionOptions: http2.SecureClientSessionOptions = {}; + if (secureContext !== null) { + connectionOptions.secureContext = secureContext; // If provided, the value of grpc.ssl_target_name_override should be used // to override the target hostname when checking server identity. // This option is used for testing only. @@ -201,8 +179,8 @@ export class Http2Channel extends EventEmitter implements Channel { }; connectionOptions.servername = sslTargetNameOverride; } - subChannel = http2.connect(this.target, connectionOptions); } + const subChannel: Http2SubChannel = new Http2SubChannel(this.target, connectionOptions, this.userAgent, this.options); this.subChannel = subChannel; const now = new Date(); const connectionTimeout: number = Math.max( @@ -230,33 +208,6 @@ export class Http2Channel extends EventEmitter implements Channel { ConnectivityState.TRANSIENT_FAILURE); }; subChannel.once('close', this.subChannelCloseCallback); - subChannel.once('error', this.subChannelCloseCallback); - } - - private sendPing() { - this.keepaliveTimeoutId = setTimeout(() => { - this.transitionToState([ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE) - }, this.keepaliveTimeoutMs) - this.subChannel!.ping((err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(this.keepaliveTimeoutId); - }); - } - - /* TODO(murgatroid99): refactor subchannels so that keepalives can be handled - * per subchannel */ - private startKeepalivePings() { - this.keepaliveIntervalId = setInterval(() => { - if (this.subChannel) { - this.sendPing(); - } - }, this.keepaliveTimeMs); - this.sendPing(); - } - - private stopKeepalivePings() { - clearInterval(this.keepaliveIntervalId); - clearTimeout(this.keepaliveTimeoutId); } constructor( @@ -278,21 +229,10 @@ export class Http2Channel extends EventEmitter implements Channel { new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this), new MetadataStatusFilterFactory(this), new CompressionFilterFactory(this) ]); - if (this.options['grpc.keepalive_time_ms']) { - this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] as number; - } - if (this.options['grpc.keepalive_timeout_ms']) { - this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] as number; - } this.currentBackoffDeadline = new Date(); /* The only purpose of these lines is to ensure that this.backoffTimerId has * a value of type NodeJS.Timer. */ this.backoffTimerId = setTimeout(() => {}, 0); - clearTimeout(this.backoffTimerId); - this.keepaliveIntervalId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveIntervalId); - this.keepaliveTimeoutId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveTimeoutId); // Build user-agent string. this.userAgent = [ @@ -316,33 +256,8 @@ export class Http2Channel extends EventEmitter implements Channel { headers[HTTP2_HEADER_PATH] = methodName; headers[HTTP2_HEADER_TE] = 'trailers'; if (this.connectivityState === ConnectivityState.READY) { - const session: Http2SubChannel = this.subChannel!; - let http2Stream = session.request(headers); - /* This is a very ad-hoc reference counting scheme. This should be - * handled by a subchannel class */ - session.ref(); - if (!session.streamCount) { - session.streamCount = 0; - } - if (session.streamCount == 0) { - /* Start keepalive pings when we start a stream on an empty - * session */ - this.startKeepalivePings(); - } - session.streamCount += 1; - http2Stream.on('close', () => { - if (!session.streamCount) { - session.streamCount = 0; - } - session.streamCount -= 1; - if (session.streamCount <= 0) { - session.unref(); - /* Stop keepalive pings when we end the last stream on a - * session */ - this.stopKeepalivePings(); - } - }); - stream.attachHttp2Stream(http2Stream); + const subChannel: Http2SubChannel = this.subChannel!; + subChannel.startCallStream(metadataValue, stream); } else { /* In this case, we lost the connection while finalizing * metadata. That should be very unusual */ diff --git a/packages/grpc-js-core/src/subchannel.ts b/packages/grpc-js-core/src/subchannel.ts new file mode 100644 index 00000000..717d5d6f --- /dev/null +++ b/packages/grpc-js-core/src/subchannel.ts @@ -0,0 +1,134 @@ +import * as http2 from 'http2'; +import * as url from 'url'; + +import { EventEmitter } from "events"; +import { Metadata } from "./metadata"; +import { CallStream, CallOptions, Http2CallStream } from "./call-stream"; +import { EmitterAugmentation1, EmitterAugmentation0 } from "./events"; +import { ChannelOptions } from './channel'; + +const { + HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_CONTENT_TYPE, + HTTP2_HEADER_METHOD, + HTTP2_HEADER_PATH, + HTTP2_HEADER_SCHEME, + HTTP2_HEADER_TE, + HTTP2_HEADER_USER_AGENT +} = http2.constants; + +/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't + * have a constant for the max signed 32 bit integer, so this is a simple way + * to calculate it */ +const KEEPALIVE_TIME_MS = ~(1 << 31); +const KEEPALIVE_TIMEOUT_MS = 20000; + +export interface SubChannel extends EventEmitter { + /** + * Attach a call stream to this subchannel's connection to start it + * @param headers The headers to start the stream with + * @param callStream The stream to start + */ + startCallStream(metadata: Metadata, callStream: CallStream): void; + close(): void; +} + +export class Http2SubChannel extends EventEmitter implements SubChannel { + private session: http2.ClientHttp2Session; + private refCount: number = 0; + private userAgent: string; + + private keepaliveTimeMs: number = KEEPALIVE_TIME_MS; + private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + private keepaliveIntervalId: NodeJS.Timer; + private keepaliveTimeoutId: NodeJS.Timer; + + constructor(target: url.URL, connectionOptions: http2.SecureClientSessionOptions, + userAgent: string, channelArgs: Partial) { + super(); + this.session = http2.connect(target, connectionOptions); + this.session.on('connect', () => { + this.emit('connect'); + }); + this.session.on('close', () => { + this.stopKeepalivePings(); + this.emit('close'); + }); + this.session.on('error', () => { + this.stopKeepalivePings(); + this.emit('close'); + }) + this.userAgent = userAgent; + + if (channelArgs['grpc.keepalive_time_ms']) { + this.keepaliveTimeMs = channelArgs['grpc.keepalive_time_ms'] as number; + } + if (channelArgs['grpc.keepalive_timeout_ms']) { + this.keepaliveTimeoutMs = channelArgs['grpc.keepalive_timeout_ms'] as number; + } + this.keepaliveIntervalId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveIntervalId); + this.keepaliveTimeoutId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveTimeoutId); + } + + private ref() { + if (this.refCount === 0) { + this.session.ref(); + this.startKeepalivePings(); + } + this.refCount += 1; + } + + private unref() { + this.refCount -= 1; + if (this.refCount === 0) { + this.session.unref(); + this.stopKeepalivePings(); + } + } + + private sendPing() { + this.keepaliveTimeoutId = setTimeout(() => { + this.emit('close'); + }, this.keepaliveTimeoutMs); + this.session.ping((err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(this.keepaliveTimeoutId); + }); + } + + /* TODO(murgatroid99): refactor subchannels so that keepalives can be handled + * per subchannel */ + private startKeepalivePings() { + this.keepaliveIntervalId = setInterval(() => { + this.sendPing(); + }, this.keepaliveTimeMs); + this.sendPing(); + } + + private stopKeepalivePings() { + clearInterval(this.keepaliveIntervalId); + clearTimeout(this.keepaliveTimeoutId); + } + + // Prerequisite: this subchannel is connected + startCallStream(metadata: Metadata, callStream: Http2CallStream) { + const headers = metadata.toHttp2Headers(); + headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); + headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; + headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; + headers[HTTP2_HEADER_METHOD] = 'POST'; + headers[HTTP2_HEADER_PATH] = callStream.getMethod(); + headers[HTTP2_HEADER_TE] = 'trailers'; + let http2Stream = this.session.request(headers); + this.ref(); + http2Stream.on('close', () => { + this.unref(); + }); + callStream.attachHttp2Stream(http2Stream); + } + + close() { + this.session.close(); + } +} \ No newline at end of file