diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index bd2235ab..5b5ed550 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -54,6 +54,7 @@ import { Listener__Output } from './generated/envoy/api/v2/Listener'; import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration'; import { Any__Output } from './generated/google/protobuf/Any'; +import { BackoffTimeout } from './backoff-timeout'; const TRACER_NAME = 'xds_client'; @@ -260,6 +261,7 @@ class EdsState implements XdsStreamState { edsServiceName: string, watcher: Watcher ): void { + trace('Adding EDS watcher for edsServiceName ' + edsServiceName); let watchersEntry = this.watchers.get(edsServiceName); let addedServiceName = false; if (watchersEntry === undefined) { @@ -276,6 +278,7 @@ class EdsState implements XdsStreamState { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { + trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); watcher.onValidUpdate(message); }); } @@ -289,6 +292,7 @@ class EdsState implements XdsStreamState { edsServiceName: string, watcher: Watcher ): void { + trace('Removing EDS watcher for edsServiceName ' + edsServiceName); const watchersEntry = this.watchers.get(edsServiceName); let removedServiceName = false; if (watchersEntry !== undefined) { @@ -342,6 +346,7 @@ class EdsState implements XdsStreamState { handleMissingNames(allEdsServiceNames: Set) { for (const [edsServiceName, watcherList] of this.watchers.entries()) { if (!allEdsServiceNames.has(edsServiceName)) { + trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } @@ -352,7 +357,7 @@ class EdsState implements XdsStreamState { handleResponses(responses: ClusterLoadAssignment__Output[]) { for (const message of responses) { if (!this.validateResponse(message)) { - return 'ClusterLoadAssignment validation failed'; + return 'EDS Error: ClusterLoadAssignment validation failed'; } } this.latestResponses = responses; @@ -364,6 +369,7 @@ class EdsState implements XdsStreamState { watcher.onValidUpdate(message); } } + trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); return null; } @@ -400,6 +406,7 @@ class CdsState implements XdsStreamState { * @param watcher */ addWatcher(clusterName: string, watcher: Watcher): void { + trace('Adding CDS watcher for clusterName ' + clusterName); let watchersEntry = this.watchers.get(clusterName); let addedServiceName = false; if (watchersEntry === undefined) { @@ -416,6 +423,7 @@ class CdsState implements XdsStreamState { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { + trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); watcher.onValidUpdate(message); }); } @@ -426,6 +434,7 @@ class CdsState implements XdsStreamState { } removeWatcher(clusterName: string, watcher: Watcher): void { + trace('Removing CDS watcher for clusterName ' + clusterName); const watchersEntry = this.watchers.get(clusterName); let removedServiceName = false; if (watchersEntry !== undefined) { @@ -466,14 +475,15 @@ class CdsState implements XdsStreamState { } /** - * Given a list of edsServiceNames (which may actually be the cluster name), + * Given a list of clusterNames (which may actually be the cluster name), * for each watcher watching a name not on the list, call that watcher's * onResourceDoesNotExist method. * @param allClusterNames */ private handleMissingNames(allClusterNames: Set) { - for (const [edsServiceName, watcherList] of this.watchers.entries()) { - if (!allClusterNames.has(edsServiceName)) { + for (const [clusterName, watcherList] of this.watchers.entries()) { + if (!allClusterNames.has(clusterName)) { + trace('Reporting CDS resource does not exist for clusterName ' + clusterName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } @@ -484,7 +494,7 @@ class CdsState implements XdsStreamState { handleResponses(responses: Cluster__Output[]): string | null { for (const message of responses) { if (!this.validateResponse(message)) { - return 'Cluster validation failed'; + return 'CDS Error: Cluster validation failed'; } } this.latestResponses = responses; @@ -501,6 +511,7 @@ class CdsState implements XdsStreamState { watcher.onValidUpdate(message); } } + trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); this.edsState.handleMissingNames(allEdsServiceNames); return null; @@ -535,6 +546,7 @@ class RdsState implements XdsStreamState { if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) { const route = virtualHost.routes[virtualHost.routes.length - 1]; if (route.match?.prefix === '' && route.route?.cluster) { + trace('Reporting RDS update for host ' + this.routeConfigName + ' with cluster ' + route.route.cluster); this.watcher.onValidUpdate({ methodConfig: [], loadBalancingConfig: [ @@ -546,10 +558,11 @@ class RdsState implements XdsStreamState { }, ], }); - break; + return; } } } + trace('Reporting RDS resource does not exist'); /* If none of the routes match the one we are looking for, bubble up an * error. */ this.watcher.onResourceDoesNotExist(); @@ -623,11 +636,13 @@ class LdsState implements XdsStreamState { HttpConnectionManager__Output; switch (httpConnectionManager.route_specifier) { case 'rds': + trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name); this.rdsState.setRouteConfigName( httpConnectionManager.rds!.route_config_name ); break; case 'route_config': + trace('Received LDS update with route configuration'); this.rdsState.setRouteConfigName(null); this.rdsState.handleSingleMessage( httpConnectionManager.route_config! @@ -637,7 +652,7 @@ class LdsState implements XdsStreamState { // The validation rules should prevent this } } else { - return 'Listener validation failed'; + return 'LRS Error: Listener validation failed'; } } } @@ -677,11 +692,11 @@ function getResponseMessages( result.push(resource as protoLoader.AnyExtension & OutputType); } else { throw new Error( - `Invalid resource type ${ + `ADS Error: Invalid resource type ${ protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url - }` + }, expected ${typeUrl}` ); } } @@ -711,6 +726,9 @@ export class XdsClient { private adsState: AdsState; + private adsBackoff: BackoffTimeout; + private lrsBackoff: BackoffTimeout; + constructor( targetName: string, serviceConfigWatcher: Watcher, @@ -752,6 +770,14 @@ export class XdsClient { delete channelArgs[arg]; } channelArgs['grpc.keepalive_time_ms'] = 5000; + + this.adsBackoff = new BackoffTimeout(() => { + this.maybeStartAdsStream(); + }); + this.lrsBackoff = new BackoffTimeout(() => { + this.maybeStartLrsStream(); + }) + Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then( ([bootstrapInfo, protoDefinitions]) => { if (this.hasShutdown) { @@ -770,6 +796,7 @@ export class XdsClient { ...node, client_features: ['envoy.lrs.supports_send_all_clusters'], }; + trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri); this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService( bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), @@ -828,6 +855,7 @@ export class XdsClient { errorString = `Unknown type_url ${message.type_url}`; } if (errorString === null) { + trace('Acking message with type URL ' + message.type_url); /* errorString can only be null in one of the first 4 cases, which * implies that message.type_url is one of the 4 known type URLs, which * means that this type assertion is valid. */ @@ -836,6 +864,7 @@ export class XdsClient { this.adsState[typeUrl].versionInfo = message.version_info; this.ack(typeUrl); } else { + trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"'); this.nack(message.type_url, errorString); } } @@ -854,6 +883,9 @@ export class XdsClient { if (this.hasShutdown) { return; } + trace('Starting ADS stream'); + // Backoff relative to when we start the request + this.adsBackoff.runOnce(); this.adsCall = this.adsClient.StreamAggregatedResources(); this.adsCall.on('data', (message: DiscoveryResponse__Output) => { this.handleAdsResponse(message); @@ -864,10 +896,11 @@ export class XdsClient { ); this.adsCall = null; this.reportStreamError(error); - /* Connection backoff is handled by the client object, so we can - * immediately start a new request to indicate that it should try to - * reconnect */ - this.maybeStartAdsStream(); + /* If the backoff timer is no longer running, we do not need to wait any + * more to start the new call. */ + if (!this.adsBackoff.isRunning()) { + this.maybeStartAdsStream(); + } }); const allTypeUrls: AdsTypeUrl[] = [ @@ -889,6 +922,11 @@ export class XdsClient { * version info are updated so that it sends the post-update values. */ ack(typeUrl: AdsTypeUrl) { + /* An ack is the best indication of a successful interaction between the + * client and the server, so we can reset the backoff timer here. */ + this.adsBackoff.stop(); + this.adsBackoff.reset(); + this.updateNames(typeUrl); } @@ -953,8 +991,17 @@ export class XdsClient { if (this.hasShutdown) { return; } + + trace('Starting LRS stream'); + this.lrsBackoff.runOnce(); this.lrsCall = this.lrsClient.streamLoadStats(); + this.lrsCall.on('metadata', () => { + /* Once we get any response from the server, we assume that the stream is + * in a good state, so we can reset the backoff timer. */ + this.lrsBackoff.stop(); + this.lrsBackoff.reset(); + }); this.lrsCall.on('data', (message: LoadStatsResponse__Output) => { if ( message.load_reporting_interval?.seconds !== @@ -970,7 +1017,7 @@ export class XdsClient { const loadReportingIntervalMs = Number.parseInt(message.load_reporting_interval!.seconds) * 1000 + message.load_reporting_interval!.nanos / 1_000_000; - setInterval(() => { + this.statsTimer = setInterval(() => { this.sendStats(); }, loadReportingIntervalMs); } @@ -982,10 +1029,11 @@ export class XdsClient { ); this.lrsCall = null; clearInterval(this.statsTimer); - /* Connection backoff is handled by the client object, so we can - * immediately start a new request to indicate that it should try to - * reconnect */ - this.maybeStartAdsStream(); + /* If the backoff timer is no longer running, we do not need to wait any + * more to start the new call. */ + if (!this.lrsBackoff.isRunning()) { + this.maybeStartLrsStream(); + } }); this.lrsCall.write({ node: this.lrsNode!,