grpc-js: Implement channel idle timeout

This commit is contained in:
Michael Lumish 2023-06-20 10:01:43 -07:00
parent dbaaa89a08
commit fcff72b941
9 changed files with 292 additions and 29 deletions

View File

@ -63,6 +63,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc.service_config_disable_resolution`
- `grpc.client_idle_timeout_ms`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -56,6 +56,7 @@ export interface ChannelOptions {
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
'grpc.service_config_disable_resolution'?: number;
'grpc.client_idle_timeout_ms'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
@ -89,6 +90,7 @@ export const recognizedOptions = {
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
'grpc.service_config_disable_resolution': true,
'grpc.client_idle_timeout_ms': true
};
export function channelOptionsEqual(

View File

@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { UnavailablePicker, Picker, PickResultType, QueuePicker } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
@ -85,6 +85,11 @@ import {
*/
const MAX_TIMEOUT_TIME = 2147483647;
const MIN_IDLE_TIMEOUT_MS = 1000;
// 30 minutes
const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000;
interface ConnectivityStateWatcher {
currentState: ConnectivityState;
timer: NodeJS.Timeout | null;
@ -153,8 +158,8 @@ class ChannelSubchannelWrapper
}
export class InternalChannel {
private resolvingLoadBalancer: ResolvingLoadBalancer;
private subchannelPool: SubchannelPool;
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
@ -164,9 +169,9 @@ export class InternalChannel {
private configSelectionQueue: ResolvingCall[] = [];
private pickQueue: LoadBalancingCall[] = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
private filterStackFactory: FilterStackFactory;
private target: GrpcUri;
private readonly defaultAuthority: string;
private readonly filterStackFactory: FilterStackFactory;
private readonly target: GrpcUri;
/**
* This timer does not do anything on its own. Its purpose is to hold the
* event loop open while there are any pending calls for the channel that
@ -174,7 +179,7 @@ export class InternalChannel {
* the invariant is that callRefTimer is reffed if and only if pickQueue
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private readonly callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
/**
* This is the error from the name resolver if it failed most recently. It
@ -184,17 +189,21 @@ export class InternalChannel {
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private readonly retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
private readonly wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
private callCount: number = 0;
private idleTimer: NodeJS.Timer | null = null;
private readonly idleTimeoutMs: number;
// Channelz info
private readonly channelzEnabled: boolean = true;
private originalTarget: string;
private channelzRef: ChannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
private childrenTracker = new ChannelzChildrenTracker();
private readonly originalTarget: string;
private readonly channelzRef: ChannelRef;
private readonly channelzTrace: ChannelzTrace;
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();
constructor(
target: string,
@ -265,6 +274,7 @@ export class InternalChannel {
DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
this.idleTimeoutMs = Math.max(options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS, MIN_IDLE_TIMEOUT_MS);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
@ -548,6 +558,45 @@ export class InternalChannel {
this.callRefTimerRef();
}
private enterIdle() {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.IDLE);
this.currentPicker = new QueuePicker(this.resolvingLoadBalancer);
}
private maybeStartIdleTimer() {
if (this.callCount === 0) {
this.idleTimer = setTimeout(() => {
this.trace('Idle timer triggered after ' + this.idleTimeoutMs + 'ms of inactivity');
this.enterIdle();
}, this.idleTimeoutMs);
this.idleTimer.unref?.();
}
}
private onCallStart() {
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
}
this.callCount += 1;
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}
private onCallEnd(status: StatusObject) {
if (this.channelzEnabled) {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
}
this.callCount -= 1;
this.maybeStartIdleTimer();
}
createLoadBalancingCall(
callConfig: CallConfig,
method: string,
@ -653,16 +702,10 @@ export class InternalChannel {
callNumber
);
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
call.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
}
this.onCallStart();
call.addStatusWatcher(status => {
this.onCallEnd(status);
});
return call;
}
@ -685,6 +728,7 @@ export class InternalChannel {
const connectivityState = this.connectivityState;
if (tryToConnect) {
this.resolvingLoadBalancer.exitIdle();
this.maybeStartIdleTimer();
}
return connectivityState;
}

View File

@ -150,6 +150,10 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
}
}
destroy(): void {
/* Note: state updates are only propagated from the child balancer if that
* object is equal to this.currentChild or this.pendingChild. Since this
* function sets both of those to null, no further state updates will
* occur after this function returns. */
if (this.currentChild) {
this.currentChild.destroy();
this.currentChild = null;

View File

@ -207,6 +207,9 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true });
this.pendingLookupPromise.then(
addressList => {
if (this.pendingLookupPromise === null) {
return;
}
this.pendingLookupPromise = null;
this.backoff.reset();
this.backoff.stop();
@ -248,6 +251,9 @@ class DnsResolver implements Resolver {
);
},
err => {
if (this.pendingLookupPromise === null) {
return;
}
trace(
'Resolution error for target ' +
uriToString(this.target) +
@ -268,6 +274,9 @@ class DnsResolver implements Resolver {
this.pendingTxtPromise = resolveTxtPromise(hostname);
this.pendingTxtPromise.then(
txtRecord => {
if (this.pendingTxtPromise === null) {
return;
}
this.pendingTxtPromise = null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
@ -348,10 +357,21 @@ class DnsResolver implements Resolver {
}
}
/**
* Reset the resolver to the same state it had when it was created. In-flight
* DNS requests cannot be cancelled, but they are discarded and their results
* will be ignored.
*/
destroy() {
this.continueResolving = false;
this.backoff.reset();
this.backoff.stop();
this.stopNextResolutionTimer();
this.pendingLookupPromise = null;
this.pendingTxtPromise = null;
this.latestLookupResult = null;
this.latestServiceConfig = null;
this.latestServiceConfigError = null;
}
/**

View File

@ -82,7 +82,10 @@ export interface Resolver {
updateResolution(): void;
/**
* Destroy the resolver. Should be called when the owning channel shuts down.
* Discard all resources owned by the resolver. A later call to
* `updateResolution` should reinitialize those resources. No
* `ResolverListener` callbacks should be called after `destroy` is called
* until `updateResolution` is called again.
*/
destroy(): void;
}

View File

@ -94,9 +94,9 @@ export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
*/
private innerResolver: Resolver;
private readonly innerResolver: Resolver;
private childLoadBalancer: ChildLoadBalancerHandler;
private readonly childLoadBalancer: ChildLoadBalancerHandler;
private latestChildState: ConnectivityState = ConnectivityState.IDLE;
private latestChildPicker: Picker = new QueuePicker(this);
/**
@ -324,7 +324,13 @@ export class ResolvingLoadBalancer implements LoadBalancer {
destroy() {
this.childLoadBalancer.destroy();
this.innerResolver.destroy();
this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker());
this.backoffTimeout.reset();
this.backoffTimeout.stop();
this.latestChildState = ConnectivityState.IDLE;
this.latestChildPicker = new QueuePicker(this);
this.currentState = ConnectivityState.IDLE;
this.previousServiceConfig = null;
this.continueResolving = false;
}
getTypeName() {

View File

@ -17,8 +17,11 @@
import * as loader from '@grpc/proto-loader';
import * as assert2 from './assert2';
import * as path from 'path';
import * as grpc from '../src';
import { GrpcObject, loadPackageDefinition } from '../src/make-client';
import { GrpcObject, ServiceClientConstructor, ServiceClient, loadPackageDefinition } from '../src/make-client';
import { readFileSync } from 'fs';
const protoLoaderOptions = {
keepCase: true,
@ -37,4 +40,89 @@ export function loadProtoFile(file: string): GrpcObject {
return loadPackageDefinition(packageDefinition);
}
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
const ca = readFileSync(path.join(__dirname, 'fixtures', 'ca.pem'));
const key = readFileSync(path.join(__dirname, 'fixtures', 'server1.key'));
const cert = readFileSync(path.join(__dirname, 'fixtures', 'server1.pem'));
const serviceImpl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
callback(null, call.request);
},
};
export class TestServer {
private server: grpc.Server;
public port: number | null = null;
constructor(public useTls: boolean, options?: grpc.ChannelOptions) {
this.server = new grpc.Server(options);
this.server.addService(echoService.service, serviceImpl);
}
start(): Promise<void> {
let credentials: grpc.ServerCredentials;
if (this.useTls) {
credentials = grpc.ServerCredentials.createSsl(null, [{private_key: key, cert_chain: cert}]);
} else {
credentials = grpc.ServerCredentials.createInsecure();
}
return new Promise<void>((resolve, reject) => {
this.server.bindAsync('localhost:0', credentials, (error, port) => {
if (error) {
reject(error);
return;
}
this.port = port;
this.server.start();
resolve();
});
});
}
shutdown() {
this.server.forceShutdown();
}
}
export class TestClient {
private client: ServiceClient;
constructor(port: number, useTls: boolean, options?: grpc.ChannelOptions) {
let credentials: grpc.ChannelCredentials;
if (useTls) {
credentials = grpc.credentials.createSsl(ca);
} else {
credentials = grpc.credentials.createInsecure();
}
this.client = new echoService(`localhost:${port}`, credentials, options);
}
static createFromServer(server: TestServer, options?: grpc.ChannelOptions) {
if (server.port === null) {
throw new Error('Cannot create client, server not started');
}
return new TestClient(server.port, server.useTls, options);
}
waitForReady(deadline: grpc.Deadline, callback: (error?: Error) => void) {
this.client.waitForReady(deadline, callback);
}
sendRequest(callback: (error: grpc.ServiceError) => void) {
this.client.echo({}, callback);
}
getChannelState() {
return this.client.getChannel().getConnectivityState(false);
}
close() {
this.client.close();
}
}
export { assert2 };

View File

@ -0,0 +1,95 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as grpc from '../src';
import { TestClient, TestServer } from "./common";
describe('Channel idle timer', () => {
let server: TestServer;
let client: TestClient | null = null;
before(() => {
server = new TestServer(false);
return server.start();
});
afterEach(() => {
if (client) {
client.close();
client = null;
}
})
after(() => {
server.shutdown();
});
it('Should go idle after the specified time after a request ends', function(done) {
this.timeout(5000);
client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000});
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE);
done();
}, 1100);
});
});
it('Should be able to make a request after going idle', function(done) {
this.timeout(5000);
client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000});
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE);
client!.sendRequest(error => {
assert.ifError(error);
done();
});
}, 1100);
});
});
it('Should go idle after the specified time after waitForReady ends', function(done) {
this.timeout(5000);
client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 1000});
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
client.waitForReady(deadline, error => {
assert.ifError(error);
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE);
done();
}, 1100);
});
});
it('Should ensure that the timeout is at least 1 second', function(done) {
client = TestClient.createFromServer(server, {'grpc.client_idle_timeout_ms': 50});
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY);
setTimeout(() => {
// Should still be ready after 100ms
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.READY);
setTimeout(() => {
// Should go IDLE after another second
assert.strictEqual(client!.getChannelState(), grpc.connectivityState.IDLE);
done();
}, 1000);
}, 100);
});
})
});