From 4ac8d6dab36b494cd49bca0a83068b067dd562e0 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 12 Oct 2022 13:48:39 -0700 Subject: [PATCH] grpc-js-xds: Remove all code for handling xDS v2 --- packages/grpc-js-xds/src/csds.ts | 14 +- packages/grpc-js-xds/src/load-balancer-eds.ts | 2 +- packages/grpc-js-xds/src/load-balancer-lrs.ts | 2 +- packages/grpc-js-xds/src/resolver-xds.ts | 28 +- packages/grpc-js-xds/src/resources.ts | 27 +- packages/grpc-js-xds/src/xds-client.ts | 428 +++++------------- .../src/xds-stream-state/lds-state.ts | 13 +- .../src/xds-stream-state/rds-state.ts | 8 +- .../src/xds-stream-state/xds-stream-state.ts | 15 +- 9 files changed, 171 insertions(+), 366 deletions(-) diff --git a/packages/grpc-js-xds/src/csds.ts b/packages/grpc-js-xds/src/csds.ts index 6454e1ad..114c1804 100644 --- a/packages/grpc-js-xds/src/csds.ts +++ b/packages/grpc-js-xds/src/csds.ts @@ -21,7 +21,7 @@ import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/ import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest"; import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse"; import { Timestamp } from "./generated/google/protobuf/Timestamp"; -import { AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from "./resources"; +import { AdsTypeUrl, CDS_TYPE_URL, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from "./resources"; import { HandleResponseResult } from "./xds-stream-state/xds-stream-state"; import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition, logVerbosity } from '@grpc/grpc-js'; import { loadSync } from "@grpc/proto-loader"; @@ -50,14 +50,10 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null { let clientNode: Node | null = null; const configStatus = { - [EDS_TYPE_URL_V2]: new Map(), - [EDS_TYPE_URL_V3]: new Map(), - [CDS_TYPE_URL_V2]: new Map(), - [CDS_TYPE_URL_V3]: new Map(), - [RDS_TYPE_URL_V2]: new Map(), - [RDS_TYPE_URL_V3]: new Map(), - [LDS_TYPE_URL_V2]: new Map(), - [LDS_TYPE_URL_V3]: new Map() + [EDS_TYPE_URL]: new Map(), + [CDS_TYPE_URL]: new Map(), + [RDS_TYPE_URL]: new Map(), + [LDS_TYPE_URL]: new Map() }; /** diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index e7aac057..6bec1fd1 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -18,7 +18,7 @@ import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, StatusObject } from '@grpc/grpc-js'; import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client'; import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment'; -import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; +import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority'; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; diff --git a/packages/grpc-js-xds/src/load-balancer-lrs.ts b/packages/grpc-js-xds/src/load-balancer-lrs.ts index 145501fe..27c62c19 100644 --- a/packages/grpc-js-xds/src/load-balancer-lrs.ts +++ b/packages/grpc-js-xds/src/load-balancer-lrs.ts @@ -16,7 +16,7 @@ */ import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js'; -import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; +import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client'; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 8c447931..496c3709 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -40,7 +40,7 @@ import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluste import { ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher'; import { envoyFractionToFraction, Fraction } from "./fraction"; import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; -import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources'; +import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources'; import Duration = experimental.Duration; import { Duration__Output } from './generated/google/protobuf/Duration'; import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter'; @@ -212,7 +212,6 @@ class XdsResolver implements Resolver { private latestRouteConfigName: string | null = null; private latestRouteConfig: RouteConfiguration__Output | null = null; - private latestRouteConfigIsV2 = false; private clusterRefcounts = new Map(); @@ -226,15 +225,15 @@ class XdsResolver implements Resolver { private channelOptions: ChannelOptions ) { this.ldsWatcher = { - onValidUpdate: (update: Listener__Output, isV2: boolean) => { - const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value); + onValidUpdate: (update: Listener__Output) => { + const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value); const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout; if (defaultTimeout === null || defaultTimeout === undefined) { this.latestDefaultTimeout = undefined; } else { this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout); } - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { this.ldsHttpFilterConfigs = []; for (const filter of httpConnectionManager.http_filters) { // typed_config must be set here, or validation would have failed @@ -260,7 +259,7 @@ class XdsResolver implements Resolver { if (this.latestRouteConfigName) { getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); } - this.handleRouteConfig(httpConnectionManager.route_config!, isV2); + this.handleRouteConfig(httpConnectionManager.route_config!); break; default: // This is prevented by the validation rules @@ -280,8 +279,8 @@ class XdsResolver implements Resolver { } }; this.rdsWatcher = { - onValidUpdate: (update: RouteConfiguration__Output, isV2: boolean) => { - this.handleRouteConfig(update, isV2); + onValidUpdate: (update: RouteConfiguration__Output) => { + this.handleRouteConfig(update); }, onTransientError: (error: StatusObject) => { /* A transient error only needs to bubble up as a failure if we have @@ -311,14 +310,13 @@ class XdsResolver implements Resolver { refCount.refCount -= 1; if (!refCount.inLastConfig && refCount.refCount === 0) { this.clusterRefcounts.delete(clusterName); - this.handleRouteConfig(this.latestRouteConfig!, this.latestRouteConfigIsV2); + this.handleRouteConfig(this.latestRouteConfig!); } } } - private handleRouteConfig(routeConfig: RouteConfiguration__Output, isV2: boolean) { + private handleRouteConfig(routeConfig: RouteConfiguration__Output) { this.latestRouteConfig = routeConfig; - this.latestRouteConfigIsV2 = isV2; /* Select the virtual host using the default authority override if it * exists, and the channel target otherwise. */ const hostDomain = this.channelOptions['grpc.default_authority'] ?? this.target.path; @@ -328,7 +326,7 @@ class XdsResolver implements Resolver { return; } const virtualHostHttpFilterOverrides = new Map(); - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { @@ -357,7 +355,7 @@ class XdsResolver implements Resolver { timeout = undefined; } const routeHttpFilterOverrides = new Map(); - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { @@ -372,7 +370,7 @@ class XdsResolver implements Resolver { const cluster = route.route!.cluster!; allConfigClusters.add(cluster); const extraFilterFactories: FilterFactory[] = []; - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const filterConfig of this.ldsHttpFilterConfigs) { if (routeHttpFilterOverrides.has(filterConfig.name)) { const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!); @@ -401,7 +399,7 @@ class XdsResolver implements Resolver { allConfigClusters.add(clusterWeight.name); const extraFilterFactories: FilterFactory[] = []; const clusterHttpFilterOverrides = new Map(); - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { diff --git a/packages/grpc-js-xds/src/resources.ts b/packages/grpc-js-xds/src/resources.ts index 4a7e2276..0972ce97 100644 --- a/packages/grpc-js-xds/src/resources.ts +++ b/packages/grpc-js-xds/src/resources.ts @@ -23,29 +23,22 @@ import { Listener__Output } from './generated/envoy/config/listener/v3/Listener' import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration'; import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager'; -export const EDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; -export const CDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Cluster'; -export const LDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Listener'; -export const RDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.RouteConfiguration'; +export const EDS_TYPE_URL = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment'; +export const CDS_TYPE_URL = 'type.googleapis.com/envoy.config.cluster.v3.Cluster'; +export const LDS_TYPE_URL = 'type.googleapis.com/envoy.config.listener.v3.Listener'; +export const RDS_TYPE_URL = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration'; -export const EDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment'; -export const CDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.cluster.v3.Cluster'; -export const LDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.listener.v3.Listener'; -export const RDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration'; - -export type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment' | 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment'; -export type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster' | 'type.googleapis.com/envoy.config.cluster.v3.Cluster'; -export type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener' | 'type.googleapis.com/envoy.config.listener.v3.Listener'; -export type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration' | 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration'; +export type EdsTypeUrl = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment'; +export type CdsTypeUrl = 'type.googleapis.com/envoy.config.cluster.v3.Cluster'; +export type LdsTypeUrl = 'type.googleapis.com/envoy.config.listener.v3.Listener'; +export type RdsTypeUrl = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration'; export type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl; -export const HTTP_CONNECTION_MANGER_TYPE_URL_V2 = - 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager'; -export const HTTP_CONNECTION_MANGER_TYPE_URL_V3 = +export const HTTP_CONNECTION_MANGER_TYPE_URL = 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager'; -export type HttpConnectionManagerTypeUrl = 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager' | 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager'; +export type HttpConnectionManagerTypeUrl = 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager'; /** * Map type URLs to their corresponding message types diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 0c8126cb..439ed80e 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -22,17 +22,12 @@ import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, ex import * as adsTypes from './generated/ads'; import * as lrsTypes from './generated/lrs'; import { loadBootstrapInfo } from './xds-bootstrap'; -import { Node as NodeV2 } from './generated/envoy/api/v2/core/Node'; -import { Node as NodeV3 } from './generated/envoy/config/core/v3/Node'; -import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV2 } from './generated/envoy/service/discovery/v2/AggregatedDiscoveryService'; -import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV3 } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService'; -import { DiscoveryRequest as DiscoveryRequestV2 } from './generated/envoy/api/v2/DiscoveryRequest'; -import { DiscoveryRequest as DiscoveryRequestV3 } from './generated/envoy/service/discovery/v3/DiscoveryRequest'; +import { Node } from './generated/envoy/config/core/v3/Node'; +import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService'; +import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest'; import { DiscoveryResponse__Output } from './generated/envoy/service/discovery/v3/DiscoveryResponse'; -import { LoadReportingServiceClient as LoadReportingServiceClientV2 } from './generated/envoy/service/load_stats/v2/LoadReportingService'; -import { LoadReportingServiceClient as LoadReportingServiceClientV3 } from './generated/envoy/service/load_stats/v3/LoadReportingService'; -import { LoadStatsRequest as LoadStatsRequestV2 } from './generated/envoy/service/load_stats/v2/LoadStatsRequest'; -import { LoadStatsRequest as LoadStatsRequestV3 } from './generated/envoy/service/load_stats/v3/LoadStatsRequest'; +import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v3/LoadReportingService'; +import { LoadStatsRequest } from './generated/envoy/service/load_stats/v3/LoadStatsRequest'; import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v3/LoadStatsResponse'; import { Locality, Locality__Output } from './generated/envoy/config/core/v3/Locality'; import { Listener__Output } from './generated/envoy/config/listener/v3/Listener'; @@ -50,7 +45,7 @@ import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster'; import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration'; import { Duration } from './generated/google/protobuf/Duration'; -import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, decodeSingleResource, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from './resources'; +import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL, decodeSingleResource, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from './resources'; import { setCsdsClientNode, updateCsdsRequestedNameList, updateCsdsResourceResponse } from './csds'; const TRACER_NAME = 'xds_client'; @@ -74,8 +69,6 @@ function loadAdsProtos(): Promise< loadedProtos = protoLoader .load( [ - 'envoy/service/discovery/v2/ads.proto', - 'envoy/service/load_stats/v2/lrs.proto', 'envoy/service/discovery/v3/ads.proto', 'envoy/service/load_stats/v3/lrs.proto', ], @@ -85,6 +78,7 @@ function loadAdsProtos(): Promise< enums: String, defaults: true, oneofs: true, + json: true, includeDirs: [ // Paths are relative to src/build __dirname + '/../../deps/envoy-api/', @@ -234,62 +228,38 @@ interface AdsState { lds: LdsState; } -enum XdsApiVersion { - V2, - V3 -} - function getResponseMessages( targetTypeUrl: T, - allowedTypeUrls: string[], resources: Any__Output[] ): ResourcePair>[] { const result: ResourcePair>[] = []; for (const resource of resources) { - if (allowedTypeUrls.includes(resource.type_url)) { - result.push({ - resource: decodeSingleResource(targetTypeUrl, resource.value), - raw: resource - }); - } else { + if (resource.type_url !== targetTypeUrl) { throw new Error( - `ADS Error: Invalid resource type ${resource.type_url}, expected ${allowedTypeUrls}` + `ADS Error: Invalid resource type ${resource.type_url}, expected ${targetTypeUrl}` ); } + result.push({ + resource: decodeSingleResource(targetTypeUrl, resource.value), + raw: resource + }); } return result; } export class XdsClient { - private apiVersion: XdsApiVersion = XdsApiVersion.V2; - private adsNodeV2: NodeV2 | null = null; - private adsNodeV3: NodeV3 | null = null; - /* A client initiates connections lazily, so the client we don't use won't - * use significant extra resources. */ - private adsClientV2: AggregatedDiscoveryServiceClientV2 | null = null; - private adsClientV3: AggregatedDiscoveryServiceClientV3 | null = null; - /* TypeScript typing is structural, so we can take advantage of the fact that - * the output structures for the two call types are identical. */ - private adsCallV2: ClientDuplexStream< - DiscoveryRequestV2, - DiscoveryResponse__Output - > | null = null; - private adsCallV3: ClientDuplexStream< - DiscoveryRequestV3, + private adsNode: Node | null = null; + private adsClient: AggregatedDiscoveryServiceClient | null = null; + private adsCall: ClientDuplexStream< + DiscoveryRequest, DiscoveryResponse__Output > | null = null; - private lrsNodeV2: NodeV2 | null = null; - private lrsNodeV3: NodeV3 | null = null; - private lrsClientV2: LoadReportingServiceClientV2 | null = null; - private lrsClientV3: LoadReportingServiceClientV3 | null = null; - private lrsCallV2: ClientDuplexStream< - LoadStatsRequestV2, - LoadStatsResponse__Output - > | null = null; - private lrsCallV3: ClientDuplexStream< - LoadStatsRequestV3, + private lrsNode: Node | null = null; + private lrsClient: LoadReportingServiceClient | null = null; + private lrsCall: ClientDuplexStream< + LoadStatsRequest, LoadStatsResponse__Output > | null = null; private latestLrsSettings: LoadStatsResponse__Output | null = null; @@ -355,48 +325,24 @@ export class XdsClient { }); return; } - if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('xds_v3') >= 0) { - this.apiVersion = XdsApiVersion.V3; - } else { - this.apiVersion = XdsApiVersion.V2; - } if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('ignore_resource_deletion') >= 0) { this.adsState.lds.enableIgnoreResourceDeletion(); this.adsState.cds.enableIgnoreResourceDeletion(); } - const nodeV2: NodeV2 = { + const userAgentName = 'gRPC Node Pure JS'; + this.adsNode = { ...bootstrapInfo.node, - build_version: `gRPC Node Pure JS ${clientVersion}`, - user_agent_name: 'gRPC Node Pure JS', - }; - const nodeV3: NodeV3 = { - ...bootstrapInfo.node, - user_agent_name: 'gRPC Node Pure JS', - }; - this.adsNodeV2 = { - ...nodeV2, + user_agent_name: userAgentName, client_features: ['envoy.lb.does_not_support_overprovisioning'], }; - this.adsNodeV3 = { - ...nodeV3, - client_features: ['envoy.lb.does_not_support_overprovisioning'], - }; - this.lrsNodeV2 = { - ...nodeV2, + this.lrsNode = { + ...bootstrapInfo.node, + user_agent_name: userAgentName, client_features: ['envoy.lrs.supports_send_all_clusters'], }; - this.lrsNodeV3 = { - ...nodeV3, - client_features: ['envoy.lrs.supports_send_all_clusters'], - }; - setCsdsClientNode(this.adsNodeV3); - if (this.apiVersion === XdsApiVersion.V2) { - trace('ADS Node: ' + JSON.stringify(this.adsNodeV2, undefined, 2)); - trace('LRS Node: ' + JSON.stringify(this.lrsNodeV2, undefined, 2)); - } else { - trace('ADS Node: ' + JSON.stringify(this.adsNodeV3, undefined, 2)); - trace('LRS Node: ' + JSON.stringify(this.lrsNodeV3, undefined, 2)); - } + setCsdsClientNode(this.adsNode); + trace('ADS Node: ' + JSON.stringify(this.adsNode, undefined, 2)); + trace('LRS Node: ' + JSON.stringify(this.lrsNode, undefined, 2)); const credentialsConfigs = bootstrapInfo.xdsServers[0].channelCreds; let channelCreds: ChannelCredentials | null = null; for (const config of credentialsConfigs) { @@ -421,24 +367,14 @@ export class XdsClient { const serverUri = bootstrapInfo.xdsServers[0].serverUri trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri); const channel = new Channel(serverUri, channelCreds, channelArgs); - this.adsClientV2 = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService( - serverUri, - channelCreds, - {channelOverride: channel} - ); - this.adsClientV3 = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService( + this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService( serverUri, channelCreds, {channelOverride: channel} ); this.maybeStartAdsStream(); - this.lrsClientV2 = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService( - serverUri, - channelCreds, - {channelOverride: channel} - ); - this.lrsClientV3 = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService( + this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService( serverUri, channelCreds, {channelOverride: channel} @@ -463,55 +399,36 @@ export class XdsClient { result: HandleResponseResult; serviceKind: AdsServiceKind; } | null = null; - let isV2: boolean; - switch (message.type_url) { - case EDS_TYPE_URL_V2: - case CDS_TYPE_URL_V2: - case RDS_TYPE_URL_V2: - case LDS_TYPE_URL_V2: - isV2 = true; - break; - default: - isV2 = false; - } try { switch (message.type_url) { - case EDS_TYPE_URL_V2: - case EDS_TYPE_URL_V3: + case EDS_TYPE_URL: handleResponseResult = { result: this.adsState.eds.handleResponses( - getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources), - isV2 + getResponseMessages(EDS_TYPE_URL, message.resources) ), serviceKind: 'eds' }; break; - case CDS_TYPE_URL_V2: - case CDS_TYPE_URL_V3: + case CDS_TYPE_URL: handleResponseResult = { result: this.adsState.cds.handleResponses( - getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources), - isV2 + getResponseMessages(CDS_TYPE_URL, message.resources) ), serviceKind: 'cds' }; break; - case RDS_TYPE_URL_V2: - case RDS_TYPE_URL_V3: + case RDS_TYPE_URL: handleResponseResult = { result: this.adsState.rds.handleResponses( - getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources), - isV2 + getResponseMessages(RDS_TYPE_URL, message.resources) ), serviceKind: 'rds' }; break; - case LDS_TYPE_URL_V2: - case LDS_TYPE_URL_V3: + case LDS_TYPE_URL: handleResponseResult = { result: this.adsState.lds.handleResponses( - getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources), - isV2 + getResponseMessages(LDS_TYPE_URL, message.resources) ), serviceKind: 'lds' } @@ -548,8 +465,7 @@ export class XdsClient { trace( 'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); - this.adsCallV2 = null; - this.adsCallV3 = null; + this.adsCall = null; if (streamStatus.code !== status.OK) { this.reportStreamError(streamStatus); } @@ -560,48 +476,6 @@ export class XdsClient { } } - private maybeStartAdsStreamV2(): boolean { - if (this.apiVersion !== XdsApiVersion.V2) { - return false; - } - if (this.adsClientV2 === null) { - return false; - } - if (this.adsCallV2 !== null) { - return false; - } - this.adsCallV2 = this.adsClientV2.StreamAggregatedResources(); - this.adsCallV2.on('data', (message: DiscoveryResponse__Output) => { - this.handleAdsResponse(message); - }); - this.adsCallV2.on('status', (status: StatusObject) => { - this.handleAdsCallStatus(status); - }); - this.adsCallV2.on('error', () => {}); - return true; - } - - private maybeStartAdsStreamV3(): boolean { - if (this.apiVersion !== XdsApiVersion.V3) { - return false; - } - if (this.adsClientV3 === null) { - return false; - } - if (this.adsCallV3 !== null) { - return false; - } - this.adsCallV3 = this.adsClientV3.StreamAggregatedResources(); - this.adsCallV3.on('data', (message: DiscoveryResponse__Output) => { - this.handleAdsResponse(message); - }); - this.adsCallV3.on('status', (status: StatusObject) => { - this.handleAdsCallStatus(status); - }); - this.adsCallV3.on('error', () => {}); - return true; - } - /** * Start the ADS stream if the client exists and there is not already an * existing stream, and there are resources to request. @@ -616,73 +490,55 @@ export class XdsClient { this.adsState.lds.getResourceNames().length === 0) { return; } - let streamStarted: boolean; - if (this.apiVersion === XdsApiVersion.V2) { - streamStarted = this.maybeStartAdsStreamV2(); - } else { - streamStarted = this.maybeStartAdsStreamV3(); + if (this.adsClient === null) { + return; } - if (streamStarted) { - trace('Started ADS stream'); - // Backoff relative to when we start the request - this.adsBackoff.runOnce(); + if (this.adsCall !== null) { + return; + } + this.adsCall = this.adsClient.StreamAggregatedResources(); + this.adsCall.on('data', (message: DiscoveryResponse__Output) => { + this.handleAdsResponse(message); + }); + this.adsCall.on('status', (status: StatusObject) => { + this.handleAdsCallStatus(status); + }); + this.adsCall.on('error', () => {}); + trace('Started ADS stream'); + // Backoff relative to when we start the request + this.adsBackoff.runOnce(); - const allServiceKinds: AdsServiceKind[] = ['eds', 'cds', 'rds', 'lds']; - for (const service of allServiceKinds) { - const state = this.adsState[service]; - if (state.getResourceNames().length > 0) { - this.updateNames(service); - } + const allServiceKinds: AdsServiceKind[] = ['eds', 'cds', 'rds', 'lds']; + for (const service of allServiceKinds) { + const state = this.adsState[service]; + if (state.getResourceNames().length > 0) { + this.updateNames(service); } - this.reportAdsStreamStarted(); } + this.reportAdsStreamStarted(); } private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) { - if (this.apiVersion === XdsApiVersion.V2) { - this.adsCallV2?.write({ - node: this.adsNodeV2!, - type_url: typeUrl, - resource_names: resourceNames, - response_nonce: responseNonce, - version_info: versionInfo, - error_detail: errorMessage ? { message: errorMessage } : undefined - }); - } else { - this.adsCallV3?.write({ - node: this.adsNodeV3!, - type_url: typeUrl, - resource_names: resourceNames, - response_nonce: responseNonce, - version_info: versionInfo, - error_detail: errorMessage ? { message: errorMessage } : undefined - }); - } + this.adsCall?.write({ + node: this.adsNode!, + type_url: typeUrl, + resource_names: resourceNames, + response_nonce: responseNonce, + version_info: versionInfo, + error_detail: errorMessage ? { message: errorMessage } : undefined + }); } private getTypeUrl(serviceKind: AdsServiceKind): AdsTypeUrl { - if (this.apiVersion === XdsApiVersion.V2) { - switch (serviceKind) { - case 'eds': - return EDS_TYPE_URL_V2; - case 'cds': - return CDS_TYPE_URL_V2; - case 'rds': - return RDS_TYPE_URL_V2; - case 'lds': - return LDS_TYPE_URL_V2; - } - } else { - switch (serviceKind) { - case 'eds': - return EDS_TYPE_URL_V3; - case 'cds': - return CDS_TYPE_URL_V3; - case 'rds': - return RDS_TYPE_URL_V3; - case 'lds': - return LDS_TYPE_URL_V3; - } + switch (serviceKind) { + case 'eds': + return EDS_TYPE_URL; + case 'cds': + return CDS_TYPE_URL; + case 'rds': + return RDS_TYPE_URL; + case 'lds': + return LDS_TYPE_URL; } } @@ -708,20 +564,16 @@ export class XdsClient { let versionInfo: string; let serviceKind: AdsServiceKind | null; switch (typeUrl) { - case EDS_TYPE_URL_V2: - case EDS_TYPE_URL_V3: + case EDS_TYPE_URL: serviceKind = 'eds'; break; - case CDS_TYPE_URL_V2: - case CDS_TYPE_URL_V3: + case CDS_TYPE_URL: serviceKind = 'cds'; break; - case RDS_TYPE_URL_V2: - case RDS_TYPE_URL_V3: + case RDS_TYPE_URL: serviceKind = 'rds'; break; - case LDS_TYPE_URL_V2: - case LDS_TYPE_URL_V3: + case LDS_TYPE_URL: serviceKind = 'lds'; break; default: @@ -731,7 +583,7 @@ export class XdsClient { if (serviceKind) { this.adsState[serviceKind].reportStreamError({ code: status.UNAVAILABLE, - details: message + ' Node ID=' + this.adsNodeV3!.id, + details: message + ' Node ID=' + this.adsNode!.id, metadata: new Metadata() }); resourceNames = this.adsState[serviceKind].getResourceNames(); @@ -750,19 +602,15 @@ export class XdsClient { this.adsState.cds.getResourceNames().length === 0 && this.adsState.rds.getResourceNames().length === 0 && this.adsState.lds.getResourceNames().length === 0) { - this.adsCallV2?.end(); - this.adsCallV2 = null; - this.adsCallV3?.end(); - this.adsCallV3 = null; - this.lrsCallV2?.end(); - this.lrsCallV2 = null; - this.lrsCallV3?.end(); - this.lrsCallV3 = null; + this.adsCall?.end(); + this.adsCall = null; + this.lrsCall?.end(); + this.lrsCall = null; return; } this.maybeStartAdsStream(); this.maybeStartLrsStream(); - if (!this.adsCallV2 && !this.adsCallV3) { + if (!this.adsCall) { /* If the stream is not set up yet at this point, shortcut the rest * becuase nothing will actually be sent. This would mainly happen if * the bootstrap file has not been read yet. In that case, the output @@ -776,7 +624,7 @@ export class XdsClient { } private reportStreamError(status: StatusObject) { - status = {...status, details: status.details + ' Node ID=' + this.adsNodeV3!.id}; + status = {...status, details: status.details + ' Node ID=' + this.adsNode!.id}; this.adsState.eds.reportStreamError(status); this.adsState.cds.reportStreamError(status); this.adsState.rds.reportStreamError(status); @@ -823,8 +671,7 @@ export class XdsClient { trace( 'LRS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); - this.lrsCallV2 = null; - this.lrsCallV3 = null; + this.lrsCall = null; clearInterval(this.statsTimer); /* If the backoff timer is no longer running, we do not need to wait any * more to start the new call. */ @@ -833,41 +680,22 @@ export class XdsClient { } } - private maybeStartLrsStreamV2(): boolean { - if (!this.lrsClientV2) { - return false; - } - if (this.lrsCallV2) { - return false; - } - this.lrsCallV2 = this.lrsClientV2.streamLoadStats(); - this.receivedLrsSettingsForCurrentStream = false; - this.lrsCallV2.on('data', (message: LoadStatsResponse__Output) => { - this.handleLrsResponse(message); - }); - this.lrsCallV2.on('status', (status: StatusObject) => { - this.handleLrsCallStatus(status); - }); - this.lrsCallV2.on('error', () => {}); - return true; - } - private maybeStartLrsStreamV3(): boolean { - if (!this.lrsClientV3) { + if (!this.lrsClient) { return false; } - if (this.lrsCallV3) { + if (this.lrsCall) { return false; } - this.lrsCallV3 = this.lrsClientV3.streamLoadStats(); + this.lrsCall = this.lrsClient.streamLoadStats(); this.receivedLrsSettingsForCurrentStream = false; - this.lrsCallV3.on('data', (message: LoadStatsResponse__Output) => { + this.lrsCall.on('data', (message: LoadStatsResponse__Output) => { this.handleLrsResponse(message); }); - this.lrsCallV3.on('status', (status: StatusObject) => { + this.lrsCall.on('status', (status: StatusObject) => { this.handleLrsCallStatus(status); }); - this.lrsCallV3.on('error', () => {}); + this.lrsCall.on('error', () => {}); return true; } @@ -881,39 +709,37 @@ export class XdsClient { this.adsState.lds.getResourceNames().length === 0) { return; } - - let streamStarted: boolean; - if (this.apiVersion === XdsApiVersion.V2) { - streamStarted = this.maybeStartLrsStreamV2(); - } else { - streamStarted = this.maybeStartLrsStreamV3(); + if (!this.lrsClient) { + return; } - - if (streamStarted) { - trace('Starting LRS stream'); - this.lrsBackoff.runOnce(); - /* Send buffered stats information when starting LRS stream. If there is no - * buffered stats information, it will still send the node field. */ - this.sendStats(); + if (this.lrsCall) { + return; } + this.lrsCall = this.lrsClient.streamLoadStats(); + this.receivedLrsSettingsForCurrentStream = false; + this.lrsCall.on('data', (message: LoadStatsResponse__Output) => { + this.handleLrsResponse(message); + }); + this.lrsCall.on('status', (status: StatusObject) => { + this.handleLrsCallStatus(status); + }); + this.lrsCall.on('error', () => {}); + trace('Starting LRS stream'); + this.lrsBackoff.runOnce(); + /* Send buffered stats information when starting LRS stream. If there is no + * buffered stats information, it will still send the node field. */ + this.sendStats(); } private maybeSendLrsMessage(clusterStats: ClusterStats[]) { - if (this.apiVersion === XdsApiVersion.V2) { - this.lrsCallV2?.write({ - node: this.lrsNodeV2!, - cluster_stats: clusterStats - }); - } else { - this.lrsCallV3?.write({ - node: this.lrsNodeV3!, - cluster_stats: clusterStats - }); - } + this.lrsCall?.write({ + node: this.lrsNode!, + cluster_stats: clusterStats + }); } private sendStats() { - if (this.lrsCallV2 === null && this.lrsCallV3 === null) { + if (this.lrsCall === null) { return; } if (!this.latestLrsSettings) { @@ -1121,14 +947,10 @@ export class XdsClient { } private shutdown(): void { - this.adsCallV2?.cancel(); - this.adsCallV3?.cancel(); - this.adsClientV2?.close(); - this.adsClientV3?.close(); - this.lrsCallV2?.cancel(); - this.lrsCallV3?.cancel(); - this.lrsClientV2?.close(); - this.lrsClientV3?.close(); + this.adsCall?.cancel(); + this.adsClient?.close(); + this.lrsCall?.cancel(); + this.lrsClient?.close(); this.hasShutdown = true; } } diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index bd5b6423..c215076d 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -19,7 +19,7 @@ import { experimental, logVerbosity } from "@grpc/grpc-js"; import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener'; import { RdsState } from "./rds-state"; import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state"; -import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources'; +import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from '../resources'; import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter'; import { EXPERIMENTAL_FAULT_INJECTION } from '../environment'; @@ -46,18 +46,17 @@ export class LdsState extends BaseXdsStreamState implements Xd super(updateResourceNames); } - public validateResponse(message: Listener__Output, isV2: boolean): boolean { + public validateResponse(message: Listener__Output): boolean { if ( !( message.api_listener?.api_listener && - (message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V2 || - message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V3) + message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL ) ) { return false; } - const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value); - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value); + if (EXPERIMENTAL_FAULT_INJECTION) { const filterNames = new Set(); for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) { if (filterNames.has(httpFilter.name)) { @@ -89,7 +88,7 @@ export class LdsState extends BaseXdsStreamState implements Xd case 'rds': return !!httpConnectionManager.rds?.config_source?.ads; case 'route_config': - return this.rdsState.validateResponse(httpConnectionManager.route_config!, isV2); + return this.rdsState.validateResponse(httpConnectionManager.route_config!); } return false; } diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index a5d3c47c..119ac6b9 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -40,7 +40,7 @@ export class RdsState extends BaseXdsStreamState imp protected getProtocolName(): string { return 'RDS'; } - validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean { + validateResponse(message: RouteConfiguration__Output): boolean { // https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation for (const virtualHost of message.virtual_hosts) { for (const domainPattern of virtualHost.domains) { @@ -55,7 +55,7 @@ export class RdsState extends BaseXdsStreamState imp return false; } } - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { return false; @@ -81,7 +81,7 @@ export class RdsState extends BaseXdsStreamState imp if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) { return false; } - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { return false; @@ -99,7 +99,7 @@ export class RdsState extends BaseXdsStreamState imp if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) { return false; } - if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { + if (EXPERIMENTAL_FAULT_INJECTION) { for (const weightedCluster of route.route!.weighted_clusters!.clusters) { for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 7b3bc018..86c2cea4 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -26,7 +26,7 @@ export interface Watcher { * message type into a library-specific configuration object type, to * remove a lot of duplicate logic, including logic for handling that * flag. */ - onValidUpdate(update: UpdateType, isV2: boolean): void; + onValidUpdate(update: UpdateType): void; onTransientError(error: StatusObject): void; onResourceDoesNotExist(): void; } @@ -85,7 +85,6 @@ export abstract class BaseXdsStreamState implements XdsStreamState nonce = ''; private subscriptions: Map> = new Map>(); - private latestIsV2 = false; private isAdsStreamRunning = false; private ignoreResourceDeletion = false; @@ -128,7 +127,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState * the same happens here */ process.nextTick(() => { this.trace('Reporting existing update for new watcher for name ' + name); - watcher.onValidUpdate(cachedResponse, this.latestIsV2); + watcher.onValidUpdate(cachedResponse); }); } if (addedName) { @@ -157,7 +156,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState getResourceNames(): string[] { return Array.from(this.subscriptions.keys()); } - handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { + handleResponses(responses: ResourcePair[]): HandleResponseResult { const validResponses: ResponseType[] = []; let result: HandleResponseResult = { accepted: [], @@ -166,7 +165,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState } for (const {resource, raw} of responses) { const resourceName = this.getResourceName(resource); - if (this.validateResponse(resource, isV2)) { + if (this.validateResponse(resource)) { validResponses.push(resource); result.accepted.push({ name: resourceName, @@ -180,7 +179,6 @@ export abstract class BaseXdsStreamState implements XdsStreamState }); } } - this.latestIsV2 = isV2; const allResourceNames = new Set(); for (const resource of validResponses) { const resourceName = this.getResourceName(resource); @@ -189,7 +187,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState if (subscriptionEntry) { const watchers = subscriptionEntry.watchers; for (const watcher of watchers) { - watcher.onValidUpdate(resource, isV2); + watcher.onValidUpdate(resource); } clearTimeout(subscriptionEntry.resourceTimer); subscriptionEntry.cachedResponse = resource; @@ -259,9 +257,8 @@ export abstract class BaseXdsStreamState implements XdsStreamState * This function is public so that the LDS validateResponse can call into * the RDS validateResponse. * @param resource The resource object sent by the xDS server - * @param isV2 If true, the resource is an xDS V2 resource instead of xDS V3 */ - public abstract validateResponse(resource: ResponseType, isV2: boolean): boolean; + public abstract validateResponse(resource: ResponseType): boolean; /** * Get the name of a resource object. The name is some field of the object, so * getting it depends on the specific type.