diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 439ed80e..2dfa4123 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -18,7 +18,7 @@ import * as protoLoader from '@grpc/proto-loader'; // This is a non-public, unstable API, but it's very convenient import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; -import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel } from '@grpc/grpc-js'; +import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js'; import * as adsTypes from './generated/ads'; import * as lrsTypes from './generated/lrs'; import { loadBootstrapInfo } from './xds-bootstrap'; @@ -255,6 +255,7 @@ export class XdsClient { DiscoveryRequest, DiscoveryResponse__Output > | null = null; + private receivedAdsResponseOnCurrentStream = false; private lrsNode: Node | null = null; private lrsClient: LoadReportingServiceClient | null = null; @@ -373,6 +374,9 @@ export class XdsClient { {channelOverride: channel} ); this.maybeStartAdsStream(); + channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => { + this.handleAdsConnectivityStateUpdate(); + }) this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService( serverUri, @@ -394,7 +398,29 @@ export class XdsClient { clearInterval(this.statsTimer); } + private handleAdsConnectivityStateUpdate() { + if (!this.adsClient) { + return; + } + const state = this.adsClient.getChannel().getConnectivityState(false); + if (state === connectivityState.READY && this.adsCall) { + this.reportAdsStreamStarted(); + } + if (state === connectivityState.TRANSIENT_FAILURE) { + this.reportStreamError({ + code: status.UNAVAILABLE, + details: 'No connection established to xDS server', + metadata: new Metadata() + }); + } + this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => { + this.handleAdsConnectivityStateUpdate(); + }); + } + private handleAdsResponse(message: DiscoveryResponse__Output) { + this.receivedAdsResponseOnCurrentStream = true; + this.adsBackoff.reset(); let handleResponseResult: { result: HandleResponseResult; serviceKind: AdsServiceKind; @@ -466,7 +492,7 @@ export class XdsClient { 'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); this.adsCall = null; - if (streamStatus.code !== status.OK) { + if (streamStatus.code !== status.OK && !this.receivedAdsResponseOnCurrentStream) { this.reportStreamError(streamStatus); } /* If the backoff timer is no longer running, we do not need to wait any @@ -496,7 +522,9 @@ export class XdsClient { if (this.adsCall !== null) { return; } - this.adsCall = this.adsClient.StreamAggregatedResources(); + this.receivedAdsResponseOnCurrentStream = false; + const metadata = new Metadata({waitForReady: true}); + this.adsCall = this.adsClient.StreamAggregatedResources(metadata); this.adsCall.on('data', (message: DiscoveryResponse__Output) => { this.handleAdsResponse(message); }); @@ -515,7 +543,9 @@ export class XdsClient { this.updateNames(service); } } - this.reportAdsStreamStarted(); + if (this.adsClient.getChannel().getConnectivityState(false) === connectivityState.READY) { + this.reportAdsStreamStarted(); + } } private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) { @@ -547,10 +577,6 @@ export class XdsClient { * version info are updated so that it sends the post-update values. */ ack(serviceKind: AdsServiceKind) { - /* An ack is the best indication of a successful interaction between the - * client and the server, so we can reset the backoff timer here. */ - this.adsBackoff.reset(); - this.updateNames(serviceKind); } diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 86c2cea4..e20bc7e9 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -213,6 +213,9 @@ export abstract class BaseXdsStreamState implements XdsStreamState } reportAdsStreamStart() { + if (this.isAdsStreamRunning) { + return; + } this.isAdsStreamRunning = true; for (const subscriptionEntry of this.subscriptions.values()) { if (subscriptionEntry.cachedResponse === null) {