diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index 4d47b254..243a1e96 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -125,6 +125,7 @@ export class CdsLoadBalancer implements LoadBalancer { private latestConfig: CdsLoadBalancingConfig | null = null; private latestAttributes: { [key: string]: unknown } = {}; + private xdsClient: XdsClient | null = null; constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); @@ -188,6 +189,7 @@ export class CdsLoadBalancer implements LoadBalancer { } trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); this.latestAttributes = attributes; + this.xdsClient = attributes.xdsClient as XdsClient; /* If the cluster is changing, disable the old watcher before adding the new * one */ @@ -196,7 +198,7 @@ export class CdsLoadBalancer implements LoadBalancer { this.latestConfig?.getCluster() !== lbConfig.getCluster() ) { trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster()); - getSingletonXdsClient().removeClusterWatcher( + this.xdsClient.removeClusterWatcher( this.latestConfig!.getCluster(), this.watcher ); @@ -212,7 +214,7 @@ export class CdsLoadBalancer implements LoadBalancer { if (!this.isWatcherActive) { trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster()); - getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher); + this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher); this.isWatcherActive = true; } } @@ -226,7 +228,7 @@ export class CdsLoadBalancer implements LoadBalancer { trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster()); this.childBalancer.destroy(); if (this.isWatcherActive) { - getSingletonXdsClient().removeClusterWatcher( + this.xdsClient?.removeClusterWatcher( this.latestConfig!.getCluster(), this.watcher ); diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index b0bd3f03..03a4078a 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -167,6 +167,7 @@ export class EdsLoadBalancer implements LoadBalancer { private lastestConfig: EdsLoadBalancingConfig | null = null; private latestAttributes: { [key: string]: unknown } = {}; + private xdsClient: XdsClient | null = null; private latestEdsUpdate: ClusterLoadAssignment__Output | null = null; /** @@ -488,13 +489,14 @@ export class EdsLoadBalancer implements LoadBalancer { trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2)); this.lastestConfig = lbConfig; this.latestAttributes = attributes; + this.xdsClient = attributes.xdsClient as XdsClient; const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster(); /* If the name is changing, disable the old watcher before adding the new * one */ if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) { trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName) - getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher); + this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher); /* Setting isWatcherActive to false here lets us have one code path for * calling addEndpointWatcher */ this.isWatcherActive = false; @@ -507,12 +509,12 @@ export class EdsLoadBalancer implements LoadBalancer { if (!this.isWatcherActive) { trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName); - getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher); + this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); this.isWatcherActive = true; } if (lbConfig.getLrsLoadReportingServerName()) { - this.clusterDropStats = getSingletonXdsClient().addClusterDropStats( + this.clusterDropStats = this.xdsClient.addClusterDropStats( lbConfig.getLrsLoadReportingServerName()!, lbConfig.getCluster(), lbConfig.getEdsServiceName() ?? '' @@ -533,7 +535,7 @@ export class EdsLoadBalancer implements LoadBalancer { destroy(): void { trace('Destroying load balancer with edsServiceName ' + this.edsServiceName); if (this.edsServiceName) { - getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher); + this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher); } this.childBalancer.destroy(); } diff --git a/packages/grpc-js-xds/src/load-balancer-lrs.ts b/packages/grpc-js-xds/src/load-balancer-lrs.ts index 745b21c5..9610ea83 100644 --- a/packages/grpc-js-xds/src/load-balancer-lrs.ts +++ b/packages/grpc-js-xds/src/load-balancer-lrs.ts @@ -169,7 +169,7 @@ export class LrsLoadBalancer implements LoadBalancer { if (!(lbConfig instanceof LrsLoadBalancingConfig)) { return; } - this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats( + this.localityStatsReporter = (attributes.xdsClient as XdsClient).addClusterLocalityStats( lbConfig.getLrsLoadReportingServerName(), lbConfig.getClusterName(), lbConfig.getEdsServiceName(), diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 401465be..9879a2c6 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -48,6 +48,7 @@ import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment' import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; import RetryPolicy = experimental.RetryPolicy; +import { validateBootstrapConfig } from './xds-bootstrap'; const TRACER_NAME = 'xds_resolver'; @@ -210,6 +211,8 @@ function getDefaultRetryMaxInterval(baseInterval: string): string { return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`; } +const BOOTSTRAP_CONFIG_KEY = 'grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config'; + const RETRY_CODES: {[key: string]: status} = { 'cancelled': status.CANCELLED, 'deadline-exceeded': status.DEADLINE_EXCEEDED, @@ -238,11 +241,20 @@ class XdsResolver implements Resolver { private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = []; + private xdsClient: XdsClient; + constructor( private target: GrpcUri, private listener: ResolverListener, private channelOptions: ChannelOptions ) { + if (channelOptions[BOOTSTRAP_CONFIG_KEY]) { + const parsedConfig = JSON.parse(channelOptions[BOOTSTRAP_CONFIG_KEY]); + const validatedConfig = validateBootstrapConfig(parsedConfig); + this.xdsClient = new XdsClient(validatedConfig); + } else { + this.xdsClient = getSingletonXdsClient(); + } this.ldsWatcher = { onValidUpdate: (update: Listener__Output) => { const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value); @@ -267,16 +279,16 @@ class XdsResolver implements Resolver { const routeConfigName = httpConnectionManager.rds!.route_config_name; if (this.latestRouteConfigName !== routeConfigName) { if (this.latestRouteConfigName !== null) { - getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); } - getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher); + this.xdsClient.addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher); this.latestRouteConfigName = routeConfigName; } break; } case 'route_config': if (this.latestRouteConfigName) { - getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); } this.handleRouteConfig(httpConnectionManager.route_config!); break; @@ -546,7 +558,7 @@ class XdsResolver implements Resolver { methodConfig: [], loadBalancingConfig: [lbPolicyConfig] } - this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {}); + this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {xdsClient: this.xdsClient}); } private reportResolutionError(reason: string) { @@ -563,15 +575,15 @@ class XdsResolver implements Resolver { // Wait until updateResolution is called once to start the xDS requests if (!this.isLdsWatcherActive) { trace('Starting resolution for target ' + uriToString(this.target)); - getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher); + this.xdsClient.addListenerWatcher(this.target.path, this.ldsWatcher); this.isLdsWatcherActive = true; } } destroy() { - getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher); + this.xdsClient.removeListenerWatcher(this.target.path, this.ldsWatcher); if (this.latestRouteConfigName) { - getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); } } diff --git a/packages/grpc-js-xds/src/xds-bootstrap.ts b/packages/grpc-js-xds/src/xds-bootstrap.ts index 876b6d95..72a0ca37 100644 --- a/packages/grpc-js-xds/src/xds-bootstrap.ts +++ b/packages/grpc-js-xds/src/xds-bootstrap.ts @@ -231,7 +231,7 @@ function validateNode(obj: any): Node { return result; } -function validateBootstrapFile(obj: any): BootstrapInfo { +export function validateBootstrapConfig(obj: any): BootstrapInfo { return { xdsServers: obj.xds_servers.map(validateXdsServerConfig), node: validateNode(obj.node), @@ -265,7 +265,7 @@ export async function loadBootstrapInfo(): Promise { } try { const parsedFile = JSON.parse(data); - resolve(validateBootstrapFile(parsedFile)); + resolve(validateBootstrapConfig(parsedFile)); } catch (e) { reject( new Error( @@ -290,7 +290,7 @@ export async function loadBootstrapInfo(): Promise { if (bootstrapConfig) { try { const parsedConfig = JSON.parse(bootstrapConfig); - const loadedBootstrapInfoValue = validateBootstrapFile(parsedConfig); + const loadedBootstrapInfoValue = validateBootstrapConfig(parsedConfig); loadedBootstrapInfo = Promise.resolve(loadedBootstrapInfoValue); } catch (e) { throw new Error( diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 2dfa4123..a2d33d1a 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -21,7 +21,7 @@ import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js'; import * as adsTypes from './generated/ads'; import * as lrsTypes from './generated/lrs'; -import { loadBootstrapInfo } from './xds-bootstrap'; +import { BootstrapInfo, loadBootstrapInfo } from './xds-bootstrap'; import { Node } from './generated/envoy/config/core/v3/Node'; import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService'; import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest'; @@ -276,7 +276,7 @@ export class XdsClient { private adsBackoff: BackoffTimeout; private lrsBackoff: BackoffTimeout; - constructor() { + constructor(bootstrapInfoOverride?: BootstrapInfo) { const edsState = new EdsState(() => { this.updateNames('eds'); }); @@ -310,7 +310,15 @@ export class XdsClient { }); this.lrsBackoff.unref(); - Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then( + async function getBootstrapInfo(): Promise { + if (bootstrapInfoOverride) { + return bootstrapInfoOverride; + } else { + return loadBootstrapInfo(); + } + } + + Promise.all([getBootstrapInfo(), loadAdsProtos()]).then( ([bootstrapInfo, protoDefinitions]) => { if (this.hasShutdown) { return; diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index cec6a476..b043ebbc 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -61,10 +61,12 @@ export class EdsState extends BaseXdsStreamState const priorityTotalWeights: Map = new Map(); for (const endpoint of message.endpoints) { if (!endpoint.locality) { + trace('EDS validation: endpoint locality unset'); return false; } for (const {locality, priority} of seenLocalities) { if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) { + trace('EDS validation: endpoint locality duplicated: ' + JSON.stringify(locality) + ', priority=' + priority); return false; } } @@ -72,16 +74,20 @@ export class EdsState extends BaseXdsStreamState for (const lb of endpoint.lb_endpoints) { const socketAddress = lb.endpoint?.address?.socket_address; if (!socketAddress) { + trace('EDS validation: endpoint socket_address not set'); return false; } if (socketAddress.port_specifier !== 'port_value') { + trace('EDS validation: socket_address.port_specifier !== "port_value"'); return false; } if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + trace('EDS validation: address not a valid IPv4 or IPv6 address: ' + socketAddress.address); return false; } for (const address of seenAddresses) { if (addressesEqual(socketAddress, address)) { + trace('EDS validation: duplicate address seen: ' + address); return false; } } @@ -91,11 +97,13 @@ export class EdsState extends BaseXdsStreamState } for (const totalWeight of priorityTotalWeights.values()) { if (totalWeight > UINT32_MAX) { + trace('EDS validation: total weight > UINT32_MAX') return false; } } for (const priority of priorityTotalWeights.keys()) { if (priority > 0 && !priorityTotalWeights.has(priority - 1)) { + trace('EDS validation: priorities not contiguous'); return false; } }