mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
grpc-js-xds: Update failure mode behavior
This commit is contained in:
parent
89e132ad3a
commit
641ed45d48
@ -18,7 +18,7 @@
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
// This is a non-public, unstable API, but it's very convenient
|
||||
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
||||
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel } from '@grpc/grpc-js';
|
||||
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
|
||||
import * as adsTypes from './generated/ads';
|
||||
import * as lrsTypes from './generated/lrs';
|
||||
import { loadBootstrapInfo } from './xds-bootstrap';
|
||||
@ -255,6 +255,7 @@ export class XdsClient {
|
||||
DiscoveryRequest,
|
||||
DiscoveryResponse__Output
|
||||
> | null = null;
|
||||
private receivedAdsResponseOnCurrentStream = false;
|
||||
|
||||
private lrsNode: Node | null = null;
|
||||
private lrsClient: LoadReportingServiceClient | null = null;
|
||||
@ -373,6 +374,9 @@ export class XdsClient {
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.maybeStartAdsStream();
|
||||
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
|
||||
this.handleAdsConnectivityStateUpdate();
|
||||
})
|
||||
|
||||
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
|
||||
serverUri,
|
||||
@ -394,7 +398,29 @@ export class XdsClient {
|
||||
clearInterval(this.statsTimer);
|
||||
}
|
||||
|
||||
private handleAdsConnectivityStateUpdate() {
|
||||
if (!this.adsClient) {
|
||||
return;
|
||||
}
|
||||
const state = this.adsClient.getChannel().getConnectivityState(false);
|
||||
if (state === connectivityState.READY && this.adsCall) {
|
||||
this.reportAdsStreamStarted();
|
||||
}
|
||||
if (state === connectivityState.TRANSIENT_FAILURE) {
|
||||
this.reportStreamError({
|
||||
code: status.UNAVAILABLE,
|
||||
details: 'No connection established to xDS server',
|
||||
metadata: new Metadata()
|
||||
});
|
||||
}
|
||||
this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => {
|
||||
this.handleAdsConnectivityStateUpdate();
|
||||
});
|
||||
}
|
||||
|
||||
private handleAdsResponse(message: DiscoveryResponse__Output) {
|
||||
this.receivedAdsResponseOnCurrentStream = true;
|
||||
this.adsBackoff.reset();
|
||||
let handleResponseResult: {
|
||||
result: HandleResponseResult;
|
||||
serviceKind: AdsServiceKind;
|
||||
@ -466,7 +492,7 @@ export class XdsClient {
|
||||
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
|
||||
);
|
||||
this.adsCall = null;
|
||||
if (streamStatus.code !== status.OK) {
|
||||
if (streamStatus.code !== status.OK && !this.receivedAdsResponseOnCurrentStream) {
|
||||
this.reportStreamError(streamStatus);
|
||||
}
|
||||
/* If the backoff timer is no longer running, we do not need to wait any
|
||||
@ -496,7 +522,9 @@ export class XdsClient {
|
||||
if (this.adsCall !== null) {
|
||||
return;
|
||||
}
|
||||
this.adsCall = this.adsClient.StreamAggregatedResources();
|
||||
this.receivedAdsResponseOnCurrentStream = false;
|
||||
const metadata = new Metadata({waitForReady: true});
|
||||
this.adsCall = this.adsClient.StreamAggregatedResources(metadata);
|
||||
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
|
||||
this.handleAdsResponse(message);
|
||||
});
|
||||
@ -515,7 +543,9 @@ export class XdsClient {
|
||||
this.updateNames(service);
|
||||
}
|
||||
}
|
||||
this.reportAdsStreamStarted();
|
||||
if (this.adsClient.getChannel().getConnectivityState(false) === connectivityState.READY) {
|
||||
this.reportAdsStreamStarted();
|
||||
}
|
||||
}
|
||||
|
||||
private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) {
|
||||
@ -547,10 +577,6 @@ export class XdsClient {
|
||||
* version info are updated so that it sends the post-update values.
|
||||
*/
|
||||
ack(serviceKind: AdsServiceKind) {
|
||||
/* An ack is the best indication of a successful interaction between the
|
||||
* client and the server, so we can reset the backoff timer here. */
|
||||
this.adsBackoff.reset();
|
||||
|
||||
this.updateNames(serviceKind);
|
||||
}
|
||||
|
||||
|
||||
@ -213,6 +213,9 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
}
|
||||
|
||||
reportAdsStreamStart() {
|
||||
if (this.isAdsStreamRunning) {
|
||||
return;
|
||||
}
|
||||
this.isAdsStreamRunning = true;
|
||||
for (const subscriptionEntry of this.subscriptions.values()) {
|
||||
if (subscriptionEntry.cachedResponse === null) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user