From fcff72b9419fadca5400f5793e598366c7121ea2 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 20 Jun 2023 10:01:43 -0700 Subject: [PATCH] grpc-js: Implement channel idle timeout --- packages/grpc-js/README.md | 1 + packages/grpc-js/src/channel-options.ts | 2 + packages/grpc-js/src/internal-channel.ts | 92 +++++++++++++----- .../src/load-balancer-child-handler.ts | 4 + packages/grpc-js/src/resolver-dns.ts | 20 ++++ packages/grpc-js/src/resolver.ts | 5 +- .../grpc-js/src/resolving-load-balancer.ts | 12 ++- packages/grpc-js/test/common.ts | 90 +++++++++++++++++- packages/grpc-js/test/test-idle-timer.ts | 95 +++++++++++++++++++ 9 files changed, 292 insertions(+), 29 deletions(-) create mode 100644 packages/grpc-js/test/test-idle-timer.ts diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index 652ce5fe..3b83433e 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -63,6 +63,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.per_rpc_retry_buffer_size` - `grpc.retry_buffer_size` - `grpc.service_config_disable_resolution` + - `grpc.client_idle_timeout_ms` - `grpc-node.max_session_memory` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index a41b89e9..cf310319 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -56,6 +56,7 @@ export interface ChannelOptions { 'grpc.max_connection_age_grace_ms'?: number; 'grpc-node.max_session_memory'?: number; 'grpc.service_config_disable_resolution'?: number; + 'grpc.client_idle_timeout_ms'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; } @@ -89,6 +90,7 @@ export const recognizedOptions = { 'grpc.max_connection_age_grace_ms': true, 'grpc-node.max_session_memory': true, 'grpc.service_config_disable_resolution': true, + 'grpc.client_idle_timeout_ms': true }; export function channelOptionsEqual( diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 3f3ca5d7..6a11028e 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ResolvingLoadBalancer } from './resolving-load-balancer'; import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; -import { UnavailablePicker, Picker, PickResultType } from './picker'; +import { UnavailablePicker, Picker, PickResultType, QueuePicker } from './picker'; import { Metadata } from './metadata'; import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; @@ -85,6 +85,11 @@ import { */ const MAX_TIMEOUT_TIME = 2147483647; +const MIN_IDLE_TIMEOUT_MS = 1000; + +// 30 minutes +const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000; + interface ConnectivityStateWatcher { currentState: ConnectivityState; timer: NodeJS.Timeout | null; @@ -153,8 +158,8 @@ class ChannelSubchannelWrapper } export class InternalChannel { - private resolvingLoadBalancer: ResolvingLoadBalancer; - private subchannelPool: SubchannelPool; + private readonly resolvingLoadBalancer: ResolvingLoadBalancer; + private readonly subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; private currentPicker: Picker = new UnavailablePicker(); /** @@ -164,9 +169,9 @@ export class InternalChannel { private configSelectionQueue: ResolvingCall[] = []; private pickQueue: LoadBalancingCall[] = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; - private defaultAuthority: string; - private filterStackFactory: FilterStackFactory; - private target: GrpcUri; + private readonly defaultAuthority: string; + private readonly filterStackFactory: FilterStackFactory; + private readonly target: GrpcUri; /** * This timer does not do anything on its own. Its purpose is to hold the * event loop open while there are any pending calls for the channel that @@ -174,7 +179,7 @@ export class InternalChannel { * the invariant is that callRefTimer is reffed if and only if pickQueue * is non-empty. */ - private callRefTimer: NodeJS.Timer; + private readonly callRefTimer: NodeJS.Timer; private configSelector: ConfigSelector | null = null; /** * This is the error from the name resolver if it failed most recently. It @@ -184,17 +189,21 @@ export class InternalChannel { * than TRANSIENT_FAILURE. */ private currentResolutionError: StatusObject | null = null; - private retryBufferTracker: MessageBufferTracker; + private readonly retryBufferTracker: MessageBufferTracker; private keepaliveTime: number; - private wrappedSubchannels: Set = new Set(); + private readonly wrappedSubchannels: Set = new Set(); + + private callCount: number = 0; + private idleTimer: NodeJS.Timer | null = null; + private readonly idleTimeoutMs: number; // Channelz info private readonly channelzEnabled: boolean = true; - private originalTarget: string; - private channelzRef: ChannelRef; - private channelzTrace: ChannelzTrace; - private callTracker = new ChannelzCallTracker(); - private childrenTracker = new ChannelzChildrenTracker(); + private readonly originalTarget: string; + private readonly channelzRef: ChannelRef; + private readonly channelzTrace: ChannelzTrace; + private readonly callTracker = new ChannelzCallTracker(); + private readonly childrenTracker = new ChannelzChildrenTracker(); constructor( target: string, @@ -265,6 +274,7 @@ export class InternalChannel { DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES ); this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1; + this.idleTimeoutMs = Math.max(options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS, MIN_IDLE_TIMEOUT_MS); const channelControlHelper: ChannelControlHelper = { createSubchannel: ( subchannelAddress: SubchannelAddress, @@ -548,6 +558,45 @@ export class InternalChannel { this.callRefTimerRef(); } + private enterIdle() { + this.resolvingLoadBalancer.destroy(); + this.updateState(ConnectivityState.IDLE); + this.currentPicker = new QueuePicker(this.resolvingLoadBalancer); + } + + private maybeStartIdleTimer() { + if (this.callCount === 0) { + this.idleTimer = setTimeout(() => { + this.trace('Idle timer triggered after ' + this.idleTimeoutMs + 'ms of inactivity'); + this.enterIdle(); + }, this.idleTimeoutMs); + this.idleTimer.unref?.(); + } + } + + private onCallStart() { + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + } + this.callCount += 1; + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = null; + } + } + + private onCallEnd(status: StatusObject) { + if (this.channelzEnabled) { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + } + this.callCount -= 1; + this.maybeStartIdleTimer(); + } + createLoadBalancingCall( callConfig: CallConfig, method: string, @@ -653,16 +702,10 @@ export class InternalChannel { callNumber ); - if (this.channelzEnabled) { - this.callTracker.addCallStarted(); - call.addStatusWatcher(status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); - } + this.onCallStart(); + call.addStatusWatcher(status => { + this.onCallEnd(status); + }); return call; } @@ -685,6 +728,7 @@ export class InternalChannel { const connectivityState = this.connectivityState; if (tryToConnect) { this.resolvingLoadBalancer.exitIdle(); + this.maybeStartIdleTimer(); } return connectivityState; } diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index b556db0c..a4dc90c4 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -150,6 +150,10 @@ export class ChildLoadBalancerHandler implements LoadBalancer { } } destroy(): void { + /* Note: state updates are only propagated from the child balancer if that + * object is equal to this.currentChild or this.pendingChild. Since this + * function sets both of those to null, no further state updates will + * occur after this function returns. */ if (this.currentChild) { this.currentChild.destroy(); this.currentChild = null; diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index f5ea6ad4..7f84e0c5 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -207,6 +207,9 @@ class DnsResolver implements Resolver { this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true }); this.pendingLookupPromise.then( addressList => { + if (this.pendingLookupPromise === null) { + return; + } this.pendingLookupPromise = null; this.backoff.reset(); this.backoff.stop(); @@ -248,6 +251,9 @@ class DnsResolver implements Resolver { ); }, err => { + if (this.pendingLookupPromise === null) { + return; + } trace( 'Resolution error for target ' + uriToString(this.target) + @@ -268,6 +274,9 @@ class DnsResolver implements Resolver { this.pendingTxtPromise = resolveTxtPromise(hostname); this.pendingTxtPromise.then( txtRecord => { + if (this.pendingTxtPromise === null) { + return; + } this.pendingTxtPromise = null; try { this.latestServiceConfig = extractAndSelectServiceConfig( @@ -348,10 +357,21 @@ class DnsResolver implements Resolver { } } + /** + * Reset the resolver to the same state it had when it was created. In-flight + * DNS requests cannot be cancelled, but they are discarded and their results + * will be ignored. + */ destroy() { this.continueResolving = false; + this.backoff.reset(); this.backoff.stop(); this.stopNextResolutionTimer(); + this.pendingLookupPromise = null; + this.pendingTxtPromise = null; + this.latestLookupResult = null; + this.latestServiceConfig = null; + this.latestServiceConfigError = null; } /** diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 77000448..43508625 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -82,7 +82,10 @@ export interface Resolver { updateResolution(): void; /** - * Destroy the resolver. Should be called when the owning channel shuts down. + * Discard all resources owned by the resolver. A later call to + * `updateResolution` should reinitialize those resources. No + * `ResolverListener` callbacks should be called after `destroy` is called + * until `updateResolution` is called again. */ destroy(): void; } diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 064053bc..16533354 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -94,9 +94,9 @@ export class ResolvingLoadBalancer implements LoadBalancer { /** * The resolver class constructed for the target address. */ - private innerResolver: Resolver; + private readonly innerResolver: Resolver; - private childLoadBalancer: ChildLoadBalancerHandler; + private readonly childLoadBalancer: ChildLoadBalancerHandler; private latestChildState: ConnectivityState = ConnectivityState.IDLE; private latestChildPicker: Picker = new QueuePicker(this); /** @@ -324,7 +324,13 @@ export class ResolvingLoadBalancer implements LoadBalancer { destroy() { this.childLoadBalancer.destroy(); this.innerResolver.destroy(); - this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker()); + this.backoffTimeout.reset(); + this.backoffTimeout.stop(); + this.latestChildState = ConnectivityState.IDLE; + this.latestChildPicker = new QueuePicker(this); + this.currentState = ConnectivityState.IDLE; + this.previousServiceConfig = null; + this.continueResolving = false; } getTypeName() { diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index 16b393b5..14fec91f 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -17,8 +17,11 @@ import * as loader from '@grpc/proto-loader'; import * as assert2 from './assert2'; +import * as path from 'path'; +import * as grpc from '../src'; -import { GrpcObject, loadPackageDefinition } from '../src/make-client'; +import { GrpcObject, ServiceClientConstructor, ServiceClient, loadPackageDefinition } from '../src/make-client'; +import { readFileSync } from 'fs'; const protoLoaderOptions = { keepCase: true, @@ -37,4 +40,89 @@ export function loadProtoFile(file: string): GrpcObject { return loadPackageDefinition(packageDefinition); } +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + +const ca = readFileSync(path.join(__dirname, 'fixtures', 'ca.pem')); +const key = readFileSync(path.join(__dirname, 'fixtures', 'server1.key')); +const cert = readFileSync(path.join(__dirname, 'fixtures', 'server1.pem')); + +const serviceImpl = { + echo: ( + call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData + ) => { + callback(null, call.request); + }, +}; + +export class TestServer { + private server: grpc.Server; + public port: number | null = null; + constructor(public useTls: boolean, options?: grpc.ChannelOptions) { + this.server = new grpc.Server(options); + this.server.addService(echoService.service, serviceImpl); + } + start(): Promise { + let credentials: grpc.ServerCredentials; + if (this.useTls) { + credentials = grpc.ServerCredentials.createSsl(null, [{private_key: key, cert_chain: cert}]); + } else { + credentials = grpc.ServerCredentials.createInsecure(); + } + return new Promise((resolve, reject) => { + this.server.bindAsync('localhost:0', credentials, (error, port) => { + if (error) { + reject(error); + return; + } + this.port = port; + this.server.start(); + resolve(); + }); + }); + } + + shutdown() { + this.server.forceShutdown(); + } +} + +export class TestClient { + private client: ServiceClient; + constructor(port: number, useTls: boolean, options?: grpc.ChannelOptions) { + let credentials: grpc.ChannelCredentials; + if (useTls) { + credentials = grpc.credentials.createSsl(ca); + } else { + credentials = grpc.credentials.createInsecure(); + } + this.client = new echoService(`localhost:${port}`, credentials, options); + } + + static createFromServer(server: TestServer, options?: grpc.ChannelOptions) { + if (server.port === null) { + throw new Error('Cannot create client, server not started'); + } + return new TestClient(server.port, server.useTls, options); + } + + waitForReady(deadline: grpc.Deadline, callback: (error?: Error) => void) { + this.client.waitForReady(deadline, callback); + } + + sendRequest(callback: (error: grpc.ServiceError) => void) { + this.client.echo({}, callback); + } + + getChannelState() { + return this.client.getChannel().getConnectivityState(false); + } + + close() { + this.client.close(); + } +} + export { assert2 }; diff --git a/packages/grpc-js/test/test-idle-timer.ts b/packages/grpc-js/test/test-idle-timer.ts new file mode 100644 index 00000000..646d0e07 --- /dev/null +++ b/packages/grpc-js/test/test-idle-timer.ts @@ -0,0 +1,95 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as grpc from '../src'; +import { TestClient, TestServer } from "./common"; + +describe('Channel idle timer', () => { + let server: TestServer; + let client: TestClient | null = null; + before(() => { + server = new TestServer(false); + return server.start(); + }); + afterEach(() => { + if (client) { + client.close(); + client = null; + } + }) + after(() => { + server.shutdown(); + }); + it('Should go idle after the specified time after a request ends', function(done) { + this.timeout(5000); + client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000}); + client.sendRequest(error => { + assert.ifError(error); + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY); + setTimeout(() => { + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE); + done(); + }, 1100); + }); + }); + it('Should be able to make a request after going idle', function(done) { + this.timeout(5000); + client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000}); + client.sendRequest(error => { + assert.ifError(error); + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY); + setTimeout(() => { + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE); + client!.sendRequest(error => { + assert.ifError(error); + done(); + }); + }, 1100); + }); + }); + it('Should go idle after the specified time after waitForReady ends', function(done) { + this.timeout(5000); + client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000}); + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 3); + client.waitForReady(deadline, error => { + assert.ifError(error); + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY); + setTimeout(() => { + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE); + done(); + }, 1100); + }); + }); + it('Should ensure that the timeout is at least 1 second', function(done) { + client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 50}); + client.sendRequest(error => { + assert.ifError(error); + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY); + setTimeout(() => { + // Should still be ready after 100ms + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY); + setTimeout(() => { + // Should go IDLE after another second + assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE); + done(); + }, 1000); + }, 100); + }); + }) +}); \ No newline at end of file