mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
grpc-js-xds: Remove all code for handling xDS v2
This commit is contained in:
parent
3003dbea52
commit
4ac8d6dab3
@ -21,7 +21,7 @@ import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/
|
||||
import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest";
|
||||
import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse";
|
||||
import { Timestamp } from "./generated/google/protobuf/Timestamp";
|
||||
import { AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from "./resources";
|
||||
import { AdsTypeUrl, CDS_TYPE_URL, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from "./resources";
|
||||
import { HandleResponseResult } from "./xds-stream-state/xds-stream-state";
|
||||
import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition, logVerbosity } from '@grpc/grpc-js';
|
||||
import { loadSync } from "@grpc/proto-loader";
|
||||
@ -50,14 +50,10 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
|
||||
let clientNode: Node | null = null;
|
||||
|
||||
const configStatus = {
|
||||
[EDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
|
||||
[EDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
|
||||
[CDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
|
||||
[CDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
|
||||
[RDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
|
||||
[RDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
|
||||
[LDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
|
||||
[LDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>()
|
||||
[EDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
|
||||
[CDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
|
||||
[RDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
|
||||
[LDS_TYPE_URL]: new Map<string, GenericXdsConfig>()
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, StatusObject } from '@grpc/grpc-js';
|
||||
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
|
||||
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
|
||||
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
|
||||
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
|
||||
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
*/
|
||||
|
||||
import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
|
||||
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
|
||||
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
|
||||
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
|
||||
@ -40,7 +40,7 @@ import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluste
|
||||
import { ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
|
||||
import { envoyFractionToFraction, Fraction } from "./fraction";
|
||||
import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
|
||||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources';
|
||||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources';
|
||||
import Duration = experimental.Duration;
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
@ -212,7 +212,6 @@ class XdsResolver implements Resolver {
|
||||
private latestRouteConfigName: string | null = null;
|
||||
|
||||
private latestRouteConfig: RouteConfiguration__Output | null = null;
|
||||
private latestRouteConfigIsV2 = false;
|
||||
|
||||
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
|
||||
|
||||
@ -226,15 +225,15 @@ class XdsResolver implements Resolver {
|
||||
private channelOptions: ChannelOptions
|
||||
) {
|
||||
this.ldsWatcher = {
|
||||
onValidUpdate: (update: Listener__Output, isV2: boolean) => {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value);
|
||||
onValidUpdate: (update: Listener__Output) => {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value);
|
||||
const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
|
||||
if (defaultTimeout === null || defaultTimeout === undefined) {
|
||||
this.latestDefaultTimeout = undefined;
|
||||
} else {
|
||||
this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout);
|
||||
}
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
this.ldsHttpFilterConfigs = [];
|
||||
for (const filter of httpConnectionManager.http_filters) {
|
||||
// typed_config must be set here, or validation would have failed
|
||||
@ -260,7 +259,7 @@ class XdsResolver implements Resolver {
|
||||
if (this.latestRouteConfigName) {
|
||||
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
}
|
||||
this.handleRouteConfig(httpConnectionManager.route_config!, isV2);
|
||||
this.handleRouteConfig(httpConnectionManager.route_config!);
|
||||
break;
|
||||
default:
|
||||
// This is prevented by the validation rules
|
||||
@ -280,8 +279,8 @@ class XdsResolver implements Resolver {
|
||||
}
|
||||
};
|
||||
this.rdsWatcher = {
|
||||
onValidUpdate: (update: RouteConfiguration__Output, isV2: boolean) => {
|
||||
this.handleRouteConfig(update, isV2);
|
||||
onValidUpdate: (update: RouteConfiguration__Output) => {
|
||||
this.handleRouteConfig(update);
|
||||
},
|
||||
onTransientError: (error: StatusObject) => {
|
||||
/* A transient error only needs to bubble up as a failure if we have
|
||||
@ -311,14 +310,13 @@ class XdsResolver implements Resolver {
|
||||
refCount.refCount -= 1;
|
||||
if (!refCount.inLastConfig && refCount.refCount === 0) {
|
||||
this.clusterRefcounts.delete(clusterName);
|
||||
this.handleRouteConfig(this.latestRouteConfig!, this.latestRouteConfigIsV2);
|
||||
this.handleRouteConfig(this.latestRouteConfig!);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleRouteConfig(routeConfig: RouteConfiguration__Output, isV2: boolean) {
|
||||
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
|
||||
this.latestRouteConfig = routeConfig;
|
||||
this.latestRouteConfigIsV2 = isV2;
|
||||
/* Select the virtual host using the default authority override if it
|
||||
* exists, and the channel target otherwise. */
|
||||
const hostDomain = this.channelOptions['grpc.default_authority'] ?? this.target.path;
|
||||
@ -328,7 +326,7 @@ class XdsResolver implements Resolver {
|
||||
return;
|
||||
}
|
||||
const virtualHostHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
@ -357,7 +355,7 @@ class XdsResolver implements Resolver {
|
||||
timeout = undefined;
|
||||
}
|
||||
const routeHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
@ -372,7 +370,7 @@ class XdsResolver implements Resolver {
|
||||
const cluster = route.route!.cluster!;
|
||||
allConfigClusters.add(cluster);
|
||||
const extraFilterFactories: FilterFactory<Filter>[] = [];
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filterConfig of this.ldsHttpFilterConfigs) {
|
||||
if (routeHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
@ -401,7 +399,7 @@ class XdsResolver implements Resolver {
|
||||
allConfigClusters.add(clusterWeight.name);
|
||||
const extraFilterFactories: FilterFactory<Filter>[] = [];
|
||||
const clusterHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
|
||||
@ -23,29 +23,22 @@ import { Listener__Output } from './generated/envoy/config/listener/v3/Listener'
|
||||
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
|
||||
import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
|
||||
|
||||
export const EDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
|
||||
export const CDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Cluster';
|
||||
export const LDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Listener';
|
||||
export const RDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
|
||||
export const EDS_TYPE_URL = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
|
||||
export const CDS_TYPE_URL = 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
|
||||
export const LDS_TYPE_URL = 'type.googleapis.com/envoy.config.listener.v3.Listener';
|
||||
export const RDS_TYPE_URL = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
|
||||
|
||||
export const EDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
|
||||
export const CDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
|
||||
export const LDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.listener.v3.Listener';
|
||||
export const RDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
|
||||
|
||||
export type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment' | 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
|
||||
export type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster' | 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
|
||||
export type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener' | 'type.googleapis.com/envoy.config.listener.v3.Listener';
|
||||
export type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration' | 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
|
||||
export type EdsTypeUrl = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
|
||||
export type CdsTypeUrl = 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
|
||||
export type LdsTypeUrl = 'type.googleapis.com/envoy.config.listener.v3.Listener';
|
||||
export type RdsTypeUrl = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
|
||||
|
||||
export type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl;
|
||||
|
||||
export const HTTP_CONNECTION_MANGER_TYPE_URL_V2 =
|
||||
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
|
||||
export const HTTP_CONNECTION_MANGER_TYPE_URL_V3 =
|
||||
export const HTTP_CONNECTION_MANGER_TYPE_URL =
|
||||
'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager';
|
||||
|
||||
export type HttpConnectionManagerTypeUrl = 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager' | 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager';
|
||||
export type HttpConnectionManagerTypeUrl = 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager';
|
||||
|
||||
/**
|
||||
* Map type URLs to their corresponding message types
|
||||
|
||||
@ -22,17 +22,12 @@ import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, ex
|
||||
import * as adsTypes from './generated/ads';
|
||||
import * as lrsTypes from './generated/lrs';
|
||||
import { loadBootstrapInfo } from './xds-bootstrap';
|
||||
import { Node as NodeV2 } from './generated/envoy/api/v2/core/Node';
|
||||
import { Node as NodeV3 } from './generated/envoy/config/core/v3/Node';
|
||||
import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV2 } from './generated/envoy/service/discovery/v2/AggregatedDiscoveryService';
|
||||
import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV3 } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
|
||||
import { DiscoveryRequest as DiscoveryRequestV2 } from './generated/envoy/api/v2/DiscoveryRequest';
|
||||
import { DiscoveryRequest as DiscoveryRequestV3 } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
|
||||
import { Node } from './generated/envoy/config/core/v3/Node';
|
||||
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
|
||||
import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
|
||||
import { DiscoveryResponse__Output } from './generated/envoy/service/discovery/v3/DiscoveryResponse';
|
||||
import { LoadReportingServiceClient as LoadReportingServiceClientV2 } from './generated/envoy/service/load_stats/v2/LoadReportingService';
|
||||
import { LoadReportingServiceClient as LoadReportingServiceClientV3 } from './generated/envoy/service/load_stats/v3/LoadReportingService';
|
||||
import { LoadStatsRequest as LoadStatsRequestV2 } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
|
||||
import { LoadStatsRequest as LoadStatsRequestV3 } from './generated/envoy/service/load_stats/v3/LoadStatsRequest';
|
||||
import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v3/LoadReportingService';
|
||||
import { LoadStatsRequest } from './generated/envoy/service/load_stats/v3/LoadStatsRequest';
|
||||
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v3/LoadStatsResponse';
|
||||
import { Locality, Locality__Output } from './generated/envoy/config/core/v3/Locality';
|
||||
import { Listener__Output } from './generated/envoy/config/listener/v3/Listener';
|
||||
@ -50,7 +45,7 @@ import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint
|
||||
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
|
||||
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
|
||||
import { Duration } from './generated/google/protobuf/Duration';
|
||||
import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, decodeSingleResource, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from './resources';
|
||||
import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL, decodeSingleResource, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from './resources';
|
||||
import { setCsdsClientNode, updateCsdsRequestedNameList, updateCsdsResourceResponse } from './csds';
|
||||
|
||||
const TRACER_NAME = 'xds_client';
|
||||
@ -74,8 +69,6 @@ function loadAdsProtos(): Promise<
|
||||
loadedProtos = protoLoader
|
||||
.load(
|
||||
[
|
||||
'envoy/service/discovery/v2/ads.proto',
|
||||
'envoy/service/load_stats/v2/lrs.proto',
|
||||
'envoy/service/discovery/v3/ads.proto',
|
||||
'envoy/service/load_stats/v3/lrs.proto',
|
||||
],
|
||||
@ -85,6 +78,7 @@ function loadAdsProtos(): Promise<
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
json: true,
|
||||
includeDirs: [
|
||||
// Paths are relative to src/build
|
||||
__dirname + '/../../deps/envoy-api/',
|
||||
@ -234,62 +228,38 @@ interface AdsState {
|
||||
lds: LdsState;
|
||||
}
|
||||
|
||||
enum XdsApiVersion {
|
||||
V2,
|
||||
V3
|
||||
}
|
||||
|
||||
function getResponseMessages<T extends AdsTypeUrl>(
|
||||
targetTypeUrl: T,
|
||||
allowedTypeUrls: string[],
|
||||
resources: Any__Output[]
|
||||
): ResourcePair<AdsOutputType<T>>[] {
|
||||
const result: ResourcePair<AdsOutputType<T>>[] = [];
|
||||
for (const resource of resources) {
|
||||
if (allowedTypeUrls.includes(resource.type_url)) {
|
||||
result.push({
|
||||
resource: decodeSingleResource(targetTypeUrl, resource.value),
|
||||
raw: resource
|
||||
});
|
||||
} else {
|
||||
if (resource.type_url !== targetTypeUrl) {
|
||||
throw new Error(
|
||||
`ADS Error: Invalid resource type ${resource.type_url}, expected ${allowedTypeUrls}`
|
||||
`ADS Error: Invalid resource type ${resource.type_url}, expected ${targetTypeUrl}`
|
||||
);
|
||||
}
|
||||
result.push({
|
||||
resource: decodeSingleResource(targetTypeUrl, resource.value),
|
||||
raw: resource
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export class XdsClient {
|
||||
private apiVersion: XdsApiVersion = XdsApiVersion.V2;
|
||||
|
||||
private adsNodeV2: NodeV2 | null = null;
|
||||
private adsNodeV3: NodeV3 | null = null;
|
||||
/* A client initiates connections lazily, so the client we don't use won't
|
||||
* use significant extra resources. */
|
||||
private adsClientV2: AggregatedDiscoveryServiceClientV2 | null = null;
|
||||
private adsClientV3: AggregatedDiscoveryServiceClientV3 | null = null;
|
||||
/* TypeScript typing is structural, so we can take advantage of the fact that
|
||||
* the output structures for the two call types are identical. */
|
||||
private adsCallV2: ClientDuplexStream<
|
||||
DiscoveryRequestV2,
|
||||
DiscoveryResponse__Output
|
||||
> | null = null;
|
||||
private adsCallV3: ClientDuplexStream<
|
||||
DiscoveryRequestV3,
|
||||
private adsNode: Node | null = null;
|
||||
private adsClient: AggregatedDiscoveryServiceClient | null = null;
|
||||
private adsCall: ClientDuplexStream<
|
||||
DiscoveryRequest,
|
||||
DiscoveryResponse__Output
|
||||
> | null = null;
|
||||
|
||||
private lrsNodeV2: NodeV2 | null = null;
|
||||
private lrsNodeV3: NodeV3 | null = null;
|
||||
private lrsClientV2: LoadReportingServiceClientV2 | null = null;
|
||||
private lrsClientV3: LoadReportingServiceClientV3 | null = null;
|
||||
private lrsCallV2: ClientDuplexStream<
|
||||
LoadStatsRequestV2,
|
||||
LoadStatsResponse__Output
|
||||
> | null = null;
|
||||
private lrsCallV3: ClientDuplexStream<
|
||||
LoadStatsRequestV3,
|
||||
private lrsNode: Node | null = null;
|
||||
private lrsClient: LoadReportingServiceClient | null = null;
|
||||
private lrsCall: ClientDuplexStream<
|
||||
LoadStatsRequest,
|
||||
LoadStatsResponse__Output
|
||||
> | null = null;
|
||||
private latestLrsSettings: LoadStatsResponse__Output | null = null;
|
||||
@ -355,48 +325,24 @@ export class XdsClient {
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('xds_v3') >= 0) {
|
||||
this.apiVersion = XdsApiVersion.V3;
|
||||
} else {
|
||||
this.apiVersion = XdsApiVersion.V2;
|
||||
}
|
||||
if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('ignore_resource_deletion') >= 0) {
|
||||
this.adsState.lds.enableIgnoreResourceDeletion();
|
||||
this.adsState.cds.enableIgnoreResourceDeletion();
|
||||
}
|
||||
const nodeV2: NodeV2 = {
|
||||
const userAgentName = 'gRPC Node Pure JS';
|
||||
this.adsNode = {
|
||||
...bootstrapInfo.node,
|
||||
build_version: `gRPC Node Pure JS ${clientVersion}`,
|
||||
user_agent_name: 'gRPC Node Pure JS',
|
||||
};
|
||||
const nodeV3: NodeV3 = {
|
||||
...bootstrapInfo.node,
|
||||
user_agent_name: 'gRPC Node Pure JS',
|
||||
};
|
||||
this.adsNodeV2 = {
|
||||
...nodeV2,
|
||||
user_agent_name: userAgentName,
|
||||
client_features: ['envoy.lb.does_not_support_overprovisioning'],
|
||||
};
|
||||
this.adsNodeV3 = {
|
||||
...nodeV3,
|
||||
client_features: ['envoy.lb.does_not_support_overprovisioning'],
|
||||
};
|
||||
this.lrsNodeV2 = {
|
||||
...nodeV2,
|
||||
this.lrsNode = {
|
||||
...bootstrapInfo.node,
|
||||
user_agent_name: userAgentName,
|
||||
client_features: ['envoy.lrs.supports_send_all_clusters'],
|
||||
};
|
||||
this.lrsNodeV3 = {
|
||||
...nodeV3,
|
||||
client_features: ['envoy.lrs.supports_send_all_clusters'],
|
||||
};
|
||||
setCsdsClientNode(this.adsNodeV3);
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
trace('ADS Node: ' + JSON.stringify(this.adsNodeV2, undefined, 2));
|
||||
trace('LRS Node: ' + JSON.stringify(this.lrsNodeV2, undefined, 2));
|
||||
} else {
|
||||
trace('ADS Node: ' + JSON.stringify(this.adsNodeV3, undefined, 2));
|
||||
trace('LRS Node: ' + JSON.stringify(this.lrsNodeV3, undefined, 2));
|
||||
}
|
||||
setCsdsClientNode(this.adsNode);
|
||||
trace('ADS Node: ' + JSON.stringify(this.adsNode, undefined, 2));
|
||||
trace('LRS Node: ' + JSON.stringify(this.lrsNode, undefined, 2));
|
||||
const credentialsConfigs = bootstrapInfo.xdsServers[0].channelCreds;
|
||||
let channelCreds: ChannelCredentials | null = null;
|
||||
for (const config of credentialsConfigs) {
|
||||
@ -421,24 +367,14 @@ export class XdsClient {
|
||||
const serverUri = bootstrapInfo.xdsServers[0].serverUri
|
||||
trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri);
|
||||
const channel = new Channel(serverUri, channelCreds, channelArgs);
|
||||
this.adsClientV2 = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.adsClientV3 = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
|
||||
this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.maybeStartAdsStream();
|
||||
|
||||
this.lrsClientV2 = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.lrsClientV3 = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
|
||||
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
@ -463,55 +399,36 @@ export class XdsClient {
|
||||
result: HandleResponseResult;
|
||||
serviceKind: AdsServiceKind;
|
||||
} | null = null;
|
||||
let isV2: boolean;
|
||||
switch (message.type_url) {
|
||||
case EDS_TYPE_URL_V2:
|
||||
case CDS_TYPE_URL_V2:
|
||||
case RDS_TYPE_URL_V2:
|
||||
case LDS_TYPE_URL_V2:
|
||||
isV2 = true;
|
||||
break;
|
||||
default:
|
||||
isV2 = false;
|
||||
}
|
||||
try {
|
||||
switch (message.type_url) {
|
||||
case EDS_TYPE_URL_V2:
|
||||
case EDS_TYPE_URL_V3:
|
||||
case EDS_TYPE_URL:
|
||||
handleResponseResult = {
|
||||
result: this.adsState.eds.handleResponses(
|
||||
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources),
|
||||
isV2
|
||||
getResponseMessages(EDS_TYPE_URL, message.resources)
|
||||
),
|
||||
serviceKind: 'eds'
|
||||
};
|
||||
break;
|
||||
case CDS_TYPE_URL_V2:
|
||||
case CDS_TYPE_URL_V3:
|
||||
case CDS_TYPE_URL:
|
||||
handleResponseResult = {
|
||||
result: this.adsState.cds.handleResponses(
|
||||
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources),
|
||||
isV2
|
||||
getResponseMessages(CDS_TYPE_URL, message.resources)
|
||||
),
|
||||
serviceKind: 'cds'
|
||||
};
|
||||
break;
|
||||
case RDS_TYPE_URL_V2:
|
||||
case RDS_TYPE_URL_V3:
|
||||
case RDS_TYPE_URL:
|
||||
handleResponseResult = {
|
||||
result: this.adsState.rds.handleResponses(
|
||||
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources),
|
||||
isV2
|
||||
getResponseMessages(RDS_TYPE_URL, message.resources)
|
||||
),
|
||||
serviceKind: 'rds'
|
||||
};
|
||||
break;
|
||||
case LDS_TYPE_URL_V2:
|
||||
case LDS_TYPE_URL_V3:
|
||||
case LDS_TYPE_URL:
|
||||
handleResponseResult = {
|
||||
result: this.adsState.lds.handleResponses(
|
||||
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources),
|
||||
isV2
|
||||
getResponseMessages(LDS_TYPE_URL, message.resources)
|
||||
),
|
||||
serviceKind: 'lds'
|
||||
}
|
||||
@ -548,8 +465,7 @@ export class XdsClient {
|
||||
trace(
|
||||
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
|
||||
);
|
||||
this.adsCallV2 = null;
|
||||
this.adsCallV3 = null;
|
||||
this.adsCall = null;
|
||||
if (streamStatus.code !== status.OK) {
|
||||
this.reportStreamError(streamStatus);
|
||||
}
|
||||
@ -560,48 +476,6 @@ export class XdsClient {
|
||||
}
|
||||
}
|
||||
|
||||
private maybeStartAdsStreamV2(): boolean {
|
||||
if (this.apiVersion !== XdsApiVersion.V2) {
|
||||
return false;
|
||||
}
|
||||
if (this.adsClientV2 === null) {
|
||||
return false;
|
||||
}
|
||||
if (this.adsCallV2 !== null) {
|
||||
return false;
|
||||
}
|
||||
this.adsCallV2 = this.adsClientV2.StreamAggregatedResources();
|
||||
this.adsCallV2.on('data', (message: DiscoveryResponse__Output) => {
|
||||
this.handleAdsResponse(message);
|
||||
});
|
||||
this.adsCallV2.on('status', (status: StatusObject) => {
|
||||
this.handleAdsCallStatus(status);
|
||||
});
|
||||
this.adsCallV2.on('error', () => {});
|
||||
return true;
|
||||
}
|
||||
|
||||
private maybeStartAdsStreamV3(): boolean {
|
||||
if (this.apiVersion !== XdsApiVersion.V3) {
|
||||
return false;
|
||||
}
|
||||
if (this.adsClientV3 === null) {
|
||||
return false;
|
||||
}
|
||||
if (this.adsCallV3 !== null) {
|
||||
return false;
|
||||
}
|
||||
this.adsCallV3 = this.adsClientV3.StreamAggregatedResources();
|
||||
this.adsCallV3.on('data', (message: DiscoveryResponse__Output) => {
|
||||
this.handleAdsResponse(message);
|
||||
});
|
||||
this.adsCallV3.on('status', (status: StatusObject) => {
|
||||
this.handleAdsCallStatus(status);
|
||||
});
|
||||
this.adsCallV3.on('error', () => {});
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the ADS stream if the client exists and there is not already an
|
||||
* existing stream, and there are resources to request.
|
||||
@ -616,73 +490,55 @@ export class XdsClient {
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
return;
|
||||
}
|
||||
let streamStarted: boolean;
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
streamStarted = this.maybeStartAdsStreamV2();
|
||||
} else {
|
||||
streamStarted = this.maybeStartAdsStreamV3();
|
||||
if (this.adsClient === null) {
|
||||
return;
|
||||
}
|
||||
if (streamStarted) {
|
||||
trace('Started ADS stream');
|
||||
// Backoff relative to when we start the request
|
||||
this.adsBackoff.runOnce();
|
||||
if (this.adsCall !== null) {
|
||||
return;
|
||||
}
|
||||
this.adsCall = this.adsClient.StreamAggregatedResources();
|
||||
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
|
||||
this.handleAdsResponse(message);
|
||||
});
|
||||
this.adsCall.on('status', (status: StatusObject) => {
|
||||
this.handleAdsCallStatus(status);
|
||||
});
|
||||
this.adsCall.on('error', () => {});
|
||||
trace('Started ADS stream');
|
||||
// Backoff relative to when we start the request
|
||||
this.adsBackoff.runOnce();
|
||||
|
||||
const allServiceKinds: AdsServiceKind[] = ['eds', 'cds', 'rds', 'lds'];
|
||||
for (const service of allServiceKinds) {
|
||||
const state = this.adsState[service];
|
||||
if (state.getResourceNames().length > 0) {
|
||||
this.updateNames(service);
|
||||
}
|
||||
const allServiceKinds: AdsServiceKind[] = ['eds', 'cds', 'rds', 'lds'];
|
||||
for (const service of allServiceKinds) {
|
||||
const state = this.adsState[service];
|
||||
if (state.getResourceNames().length > 0) {
|
||||
this.updateNames(service);
|
||||
}
|
||||
this.reportAdsStreamStarted();
|
||||
}
|
||||
this.reportAdsStreamStarted();
|
||||
}
|
||||
|
||||
private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) {
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
this.adsCallV2?.write({
|
||||
node: this.adsNodeV2!,
|
||||
type_url: typeUrl,
|
||||
resource_names: resourceNames,
|
||||
response_nonce: responseNonce,
|
||||
version_info: versionInfo,
|
||||
error_detail: errorMessage ? { message: errorMessage } : undefined
|
||||
});
|
||||
} else {
|
||||
this.adsCallV3?.write({
|
||||
node: this.adsNodeV3!,
|
||||
type_url: typeUrl,
|
||||
resource_names: resourceNames,
|
||||
response_nonce: responseNonce,
|
||||
version_info: versionInfo,
|
||||
error_detail: errorMessage ? { message: errorMessage } : undefined
|
||||
});
|
||||
}
|
||||
this.adsCall?.write({
|
||||
node: this.adsNode!,
|
||||
type_url: typeUrl,
|
||||
resource_names: resourceNames,
|
||||
response_nonce: responseNonce,
|
||||
version_info: versionInfo,
|
||||
error_detail: errorMessage ? { message: errorMessage } : undefined
|
||||
});
|
||||
}
|
||||
|
||||
private getTypeUrl(serviceKind: AdsServiceKind): AdsTypeUrl {
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
switch (serviceKind) {
|
||||
case 'eds':
|
||||
return EDS_TYPE_URL_V2;
|
||||
case 'cds':
|
||||
return CDS_TYPE_URL_V2;
|
||||
case 'rds':
|
||||
return RDS_TYPE_URL_V2;
|
||||
case 'lds':
|
||||
return LDS_TYPE_URL_V2;
|
||||
}
|
||||
} else {
|
||||
switch (serviceKind) {
|
||||
case 'eds':
|
||||
return EDS_TYPE_URL_V3;
|
||||
case 'cds':
|
||||
return CDS_TYPE_URL_V3;
|
||||
case 'rds':
|
||||
return RDS_TYPE_URL_V3;
|
||||
case 'lds':
|
||||
return LDS_TYPE_URL_V3;
|
||||
}
|
||||
switch (serviceKind) {
|
||||
case 'eds':
|
||||
return EDS_TYPE_URL;
|
||||
case 'cds':
|
||||
return CDS_TYPE_URL;
|
||||
case 'rds':
|
||||
return RDS_TYPE_URL;
|
||||
case 'lds':
|
||||
return LDS_TYPE_URL;
|
||||
}
|
||||
}
|
||||
|
||||
@ -708,20 +564,16 @@ export class XdsClient {
|
||||
let versionInfo: string;
|
||||
let serviceKind: AdsServiceKind | null;
|
||||
switch (typeUrl) {
|
||||
case EDS_TYPE_URL_V2:
|
||||
case EDS_TYPE_URL_V3:
|
||||
case EDS_TYPE_URL:
|
||||
serviceKind = 'eds';
|
||||
break;
|
||||
case CDS_TYPE_URL_V2:
|
||||
case CDS_TYPE_URL_V3:
|
||||
case CDS_TYPE_URL:
|
||||
serviceKind = 'cds';
|
||||
break;
|
||||
case RDS_TYPE_URL_V2:
|
||||
case RDS_TYPE_URL_V3:
|
||||
case RDS_TYPE_URL:
|
||||
serviceKind = 'rds';
|
||||
break;
|
||||
case LDS_TYPE_URL_V2:
|
||||
case LDS_TYPE_URL_V3:
|
||||
case LDS_TYPE_URL:
|
||||
serviceKind = 'lds';
|
||||
break;
|
||||
default:
|
||||
@ -731,7 +583,7 @@ export class XdsClient {
|
||||
if (serviceKind) {
|
||||
this.adsState[serviceKind].reportStreamError({
|
||||
code: status.UNAVAILABLE,
|
||||
details: message + ' Node ID=' + this.adsNodeV3!.id,
|
||||
details: message + ' Node ID=' + this.adsNode!.id,
|
||||
metadata: new Metadata()
|
||||
});
|
||||
resourceNames = this.adsState[serviceKind].getResourceNames();
|
||||
@ -750,19 +602,15 @@ export class XdsClient {
|
||||
this.adsState.cds.getResourceNames().length === 0 &&
|
||||
this.adsState.rds.getResourceNames().length === 0 &&
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
this.adsCallV2?.end();
|
||||
this.adsCallV2 = null;
|
||||
this.adsCallV3?.end();
|
||||
this.adsCallV3 = null;
|
||||
this.lrsCallV2?.end();
|
||||
this.lrsCallV2 = null;
|
||||
this.lrsCallV3?.end();
|
||||
this.lrsCallV3 = null;
|
||||
this.adsCall?.end();
|
||||
this.adsCall = null;
|
||||
this.lrsCall?.end();
|
||||
this.lrsCall = null;
|
||||
return;
|
||||
}
|
||||
this.maybeStartAdsStream();
|
||||
this.maybeStartLrsStream();
|
||||
if (!this.adsCallV2 && !this.adsCallV3) {
|
||||
if (!this.adsCall) {
|
||||
/* If the stream is not set up yet at this point, shortcut the rest
|
||||
* becuase nothing will actually be sent. This would mainly happen if
|
||||
* the bootstrap file has not been read yet. In that case, the output
|
||||
@ -776,7 +624,7 @@ export class XdsClient {
|
||||
}
|
||||
|
||||
private reportStreamError(status: StatusObject) {
|
||||
status = {...status, details: status.details + ' Node ID=' + this.adsNodeV3!.id};
|
||||
status = {...status, details: status.details + ' Node ID=' + this.adsNode!.id};
|
||||
this.adsState.eds.reportStreamError(status);
|
||||
this.adsState.cds.reportStreamError(status);
|
||||
this.adsState.rds.reportStreamError(status);
|
||||
@ -823,8 +671,7 @@ export class XdsClient {
|
||||
trace(
|
||||
'LRS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
|
||||
);
|
||||
this.lrsCallV2 = null;
|
||||
this.lrsCallV3 = null;
|
||||
this.lrsCall = null;
|
||||
clearInterval(this.statsTimer);
|
||||
/* If the backoff timer is no longer running, we do not need to wait any
|
||||
* more to start the new call. */
|
||||
@ -833,41 +680,22 @@ export class XdsClient {
|
||||
}
|
||||
}
|
||||
|
||||
private maybeStartLrsStreamV2(): boolean {
|
||||
if (!this.lrsClientV2) {
|
||||
return false;
|
||||
}
|
||||
if (this.lrsCallV2) {
|
||||
return false;
|
||||
}
|
||||
this.lrsCallV2 = this.lrsClientV2.streamLoadStats();
|
||||
this.receivedLrsSettingsForCurrentStream = false;
|
||||
this.lrsCallV2.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.handleLrsResponse(message);
|
||||
});
|
||||
this.lrsCallV2.on('status', (status: StatusObject) => {
|
||||
this.handleLrsCallStatus(status);
|
||||
});
|
||||
this.lrsCallV2.on('error', () => {});
|
||||
return true;
|
||||
}
|
||||
|
||||
private maybeStartLrsStreamV3(): boolean {
|
||||
if (!this.lrsClientV3) {
|
||||
if (!this.lrsClient) {
|
||||
return false;
|
||||
}
|
||||
if (this.lrsCallV3) {
|
||||
if (this.lrsCall) {
|
||||
return false;
|
||||
}
|
||||
this.lrsCallV3 = this.lrsClientV3.streamLoadStats();
|
||||
this.lrsCall = this.lrsClient.streamLoadStats();
|
||||
this.receivedLrsSettingsForCurrentStream = false;
|
||||
this.lrsCallV3.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.handleLrsResponse(message);
|
||||
});
|
||||
this.lrsCallV3.on('status', (status: StatusObject) => {
|
||||
this.lrsCall.on('status', (status: StatusObject) => {
|
||||
this.handleLrsCallStatus(status);
|
||||
});
|
||||
this.lrsCallV3.on('error', () => {});
|
||||
this.lrsCall.on('error', () => {});
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -881,39 +709,37 @@ export class XdsClient {
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
let streamStarted: boolean;
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
streamStarted = this.maybeStartLrsStreamV2();
|
||||
} else {
|
||||
streamStarted = this.maybeStartLrsStreamV3();
|
||||
if (!this.lrsClient) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamStarted) {
|
||||
trace('Starting LRS stream');
|
||||
this.lrsBackoff.runOnce();
|
||||
/* Send buffered stats information when starting LRS stream. If there is no
|
||||
* buffered stats information, it will still send the node field. */
|
||||
this.sendStats();
|
||||
if (this.lrsCall) {
|
||||
return;
|
||||
}
|
||||
this.lrsCall = this.lrsClient.streamLoadStats();
|
||||
this.receivedLrsSettingsForCurrentStream = false;
|
||||
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.handleLrsResponse(message);
|
||||
});
|
||||
this.lrsCall.on('status', (status: StatusObject) => {
|
||||
this.handleLrsCallStatus(status);
|
||||
});
|
||||
this.lrsCall.on('error', () => {});
|
||||
trace('Starting LRS stream');
|
||||
this.lrsBackoff.runOnce();
|
||||
/* Send buffered stats information when starting LRS stream. If there is no
|
||||
* buffered stats information, it will still send the node field. */
|
||||
this.sendStats();
|
||||
}
|
||||
|
||||
private maybeSendLrsMessage(clusterStats: ClusterStats[]) {
|
||||
if (this.apiVersion === XdsApiVersion.V2) {
|
||||
this.lrsCallV2?.write({
|
||||
node: this.lrsNodeV2!,
|
||||
cluster_stats: clusterStats
|
||||
});
|
||||
} else {
|
||||
this.lrsCallV3?.write({
|
||||
node: this.lrsNodeV3!,
|
||||
cluster_stats: clusterStats
|
||||
});
|
||||
}
|
||||
this.lrsCall?.write({
|
||||
node: this.lrsNode!,
|
||||
cluster_stats: clusterStats
|
||||
});
|
||||
}
|
||||
|
||||
private sendStats() {
|
||||
if (this.lrsCallV2 === null && this.lrsCallV3 === null) {
|
||||
if (this.lrsCall === null) {
|
||||
return;
|
||||
}
|
||||
if (!this.latestLrsSettings) {
|
||||
@ -1121,14 +947,10 @@ export class XdsClient {
|
||||
}
|
||||
|
||||
private shutdown(): void {
|
||||
this.adsCallV2?.cancel();
|
||||
this.adsCallV3?.cancel();
|
||||
this.adsClientV2?.close();
|
||||
this.adsClientV3?.close();
|
||||
this.lrsCallV2?.cancel();
|
||||
this.lrsCallV3?.cancel();
|
||||
this.lrsClientV2?.close();
|
||||
this.lrsClientV3?.close();
|
||||
this.adsCall?.cancel();
|
||||
this.adsClient?.close();
|
||||
this.lrsCall?.cancel();
|
||||
this.lrsClient?.close();
|
||||
this.hasShutdown = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,7 +19,7 @@ import { experimental, logVerbosity } from "@grpc/grpc-js";
|
||||
import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener';
|
||||
import { RdsState } from "./rds-state";
|
||||
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
|
||||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources';
|
||||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from '../resources';
|
||||
import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from '../environment';
|
||||
|
||||
@ -46,18 +46,17 @@ export class LdsState extends BaseXdsStreamState<Listener__Output> implements Xd
|
||||
super(updateResourceNames);
|
||||
}
|
||||
|
||||
public validateResponse(message: Listener__Output, isV2: boolean): boolean {
|
||||
public validateResponse(message: Listener__Output): boolean {
|
||||
if (
|
||||
!(
|
||||
message.api_listener?.api_listener &&
|
||||
(message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V2 ||
|
||||
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V3)
|
||||
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL
|
||||
)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value);
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value);
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
const filterNames = new Set<string>();
|
||||
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
|
||||
if (filterNames.has(httpFilter.name)) {
|
||||
@ -89,7 +88,7 @@ export class LdsState extends BaseXdsStreamState<Listener__Output> implements Xd
|
||||
case 'rds':
|
||||
return !!httpConnectionManager.rds?.config_source?.ads;
|
||||
case 'route_config':
|
||||
return this.rdsState.validateResponse(httpConnectionManager.route_config!, isV2);
|
||||
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
||||
protected getProtocolName(): string {
|
||||
return 'RDS';
|
||||
}
|
||||
validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean {
|
||||
validateResponse(message: RouteConfiguration__Output): boolean {
|
||||
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
|
||||
for (const virtualHost of message.virtual_hosts) {
|
||||
for (const domainPattern of virtualHost.domains) {
|
||||
@ -55,7 +55,7 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
return false;
|
||||
@ -81,7 +81,7 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
||||
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
|
||||
return false;
|
||||
}
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
return false;
|
||||
@ -99,7 +99,7 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
||||
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
|
||||
return false;
|
||||
}
|
||||
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
|
||||
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
|
||||
@ -26,7 +26,7 @@ export interface Watcher<UpdateType> {
|
||||
* message type into a library-specific configuration object type, to
|
||||
* remove a lot of duplicate logic, including logic for handling that
|
||||
* flag. */
|
||||
onValidUpdate(update: UpdateType, isV2: boolean): void;
|
||||
onValidUpdate(update: UpdateType): void;
|
||||
onTransientError(error: StatusObject): void;
|
||||
onResourceDoesNotExist(): void;
|
||||
}
|
||||
@ -85,7 +85,6 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
nonce = '';
|
||||
|
||||
private subscriptions: Map<string, SubscriptionEntry<ResponseType>> = new Map<string, SubscriptionEntry<ResponseType>>();
|
||||
private latestIsV2 = false;
|
||||
private isAdsStreamRunning = false;
|
||||
private ignoreResourceDeletion = false;
|
||||
|
||||
@ -128,7 +127,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
* the same happens here */
|
||||
process.nextTick(() => {
|
||||
this.trace('Reporting existing update for new watcher for name ' + name);
|
||||
watcher.onValidUpdate(cachedResponse, this.latestIsV2);
|
||||
watcher.onValidUpdate(cachedResponse);
|
||||
});
|
||||
}
|
||||
if (addedName) {
|
||||
@ -157,7 +156,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
getResourceNames(): string[] {
|
||||
return Array.from(this.subscriptions.keys());
|
||||
}
|
||||
handleResponses(responses: ResourcePair<ResponseType>[], isV2: boolean): HandleResponseResult {
|
||||
handleResponses(responses: ResourcePair<ResponseType>[]): HandleResponseResult {
|
||||
const validResponses: ResponseType[] = [];
|
||||
let result: HandleResponseResult = {
|
||||
accepted: [],
|
||||
@ -166,7 +165,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
}
|
||||
for (const {resource, raw} of responses) {
|
||||
const resourceName = this.getResourceName(resource);
|
||||
if (this.validateResponse(resource, isV2)) {
|
||||
if (this.validateResponse(resource)) {
|
||||
validResponses.push(resource);
|
||||
result.accepted.push({
|
||||
name: resourceName,
|
||||
@ -180,7 +179,6 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
});
|
||||
}
|
||||
}
|
||||
this.latestIsV2 = isV2;
|
||||
const allResourceNames = new Set<string>();
|
||||
for (const resource of validResponses) {
|
||||
const resourceName = this.getResourceName(resource);
|
||||
@ -189,7 +187,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
if (subscriptionEntry) {
|
||||
const watchers = subscriptionEntry.watchers;
|
||||
for (const watcher of watchers) {
|
||||
watcher.onValidUpdate(resource, isV2);
|
||||
watcher.onValidUpdate(resource);
|
||||
}
|
||||
clearTimeout(subscriptionEntry.resourceTimer);
|
||||
subscriptionEntry.cachedResponse = resource;
|
||||
@ -259,9 +257,8 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
|
||||
* This function is public so that the LDS validateResponse can call into
|
||||
* the RDS validateResponse.
|
||||
* @param resource The resource object sent by the xDS server
|
||||
* @param isV2 If true, the resource is an xDS V2 resource instead of xDS V3
|
||||
*/
|
||||
public abstract validateResponse(resource: ResponseType, isV2: boolean): boolean;
|
||||
public abstract validateResponse(resource: ResponseType): boolean;
|
||||
/**
|
||||
* Get the name of a resource object. The name is some field of the object, so
|
||||
* getting it depends on the specific type.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user