mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
grpc-js-xds: Allow tests to set bootstrap info in channel args
This commit is contained in:
parent
6bc6b8665b
commit
e32bbc7aac
@ -125,6 +125,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
||||
|
||||
private latestConfig: CdsLoadBalancingConfig | null = null;
|
||||
private latestAttributes: { [key: string]: unknown } = {};
|
||||
private xdsClient: XdsClient | null = null;
|
||||
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
|
||||
@ -188,6 +189,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
||||
}
|
||||
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.latestAttributes = attributes;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
|
||||
/* If the cluster is changing, disable the old watcher before adding the new
|
||||
* one */
|
||||
@ -196,7 +198,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
||||
this.latestConfig?.getCluster() !== lbConfig.getCluster()
|
||||
) {
|
||||
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
|
||||
getSingletonXdsClient().removeClusterWatcher(
|
||||
this.xdsClient.removeClusterWatcher(
|
||||
this.latestConfig!.getCluster(),
|
||||
this.watcher
|
||||
);
|
||||
@ -212,7 +214,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
||||
|
||||
if (!this.isWatcherActive) {
|
||||
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
|
||||
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
|
||||
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
|
||||
this.isWatcherActive = true;
|
||||
}
|
||||
}
|
||||
@ -226,7 +228,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
||||
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
|
||||
this.childBalancer.destroy();
|
||||
if (this.isWatcherActive) {
|
||||
getSingletonXdsClient().removeClusterWatcher(
|
||||
this.xdsClient?.removeClusterWatcher(
|
||||
this.latestConfig!.getCluster(),
|
||||
this.watcher
|
||||
);
|
||||
|
||||
@ -167,6 +167,7 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||
|
||||
private lastestConfig: EdsLoadBalancingConfig | null = null;
|
||||
private latestAttributes: { [key: string]: unknown } = {};
|
||||
private xdsClient: XdsClient | null = null;
|
||||
private latestEdsUpdate: ClusterLoadAssignment__Output | null = null;
|
||||
|
||||
/**
|
||||
@ -488,13 +489,14 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.lastestConfig = lbConfig;
|
||||
this.latestAttributes = attributes;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();
|
||||
|
||||
/* If the name is changing, disable the old watcher before adding the new
|
||||
* one */
|
||||
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
|
||||
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
|
||||
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
|
||||
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
|
||||
/* Setting isWatcherActive to false here lets us have one code path for
|
||||
* calling addEndpointWatcher */
|
||||
this.isWatcherActive = false;
|
||||
@ -507,12 +509,12 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||
|
||||
if (!this.isWatcherActive) {
|
||||
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
|
||||
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
|
||||
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
|
||||
this.isWatcherActive = true;
|
||||
}
|
||||
|
||||
if (lbConfig.getLrsLoadReportingServerName()) {
|
||||
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
|
||||
this.clusterDropStats = this.xdsClient.addClusterDropStats(
|
||||
lbConfig.getLrsLoadReportingServerName()!,
|
||||
lbConfig.getCluster(),
|
||||
lbConfig.getEdsServiceName() ?? ''
|
||||
@ -533,7 +535,7 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||
destroy(): void {
|
||||
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
|
||||
if (this.edsServiceName) {
|
||||
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
|
||||
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
|
||||
}
|
||||
this.childBalancer.destroy();
|
||||
}
|
||||
|
||||
@ -169,7 +169,7 @@ export class LrsLoadBalancer implements LoadBalancer {
|
||||
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
|
||||
return;
|
||||
}
|
||||
this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats(
|
||||
this.localityStatsReporter = (attributes.xdsClient as XdsClient).addClusterLocalityStats(
|
||||
lbConfig.getLrsLoadReportingServerName(),
|
||||
lbConfig.getClusterName(),
|
||||
lbConfig.getEdsServiceName(),
|
||||
|
||||
@ -48,6 +48,7 @@ import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment'
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import RetryPolicy = experimental.RetryPolicy;
|
||||
import { validateBootstrapConfig } from './xds-bootstrap';
|
||||
|
||||
const TRACER_NAME = 'xds_resolver';
|
||||
|
||||
@ -210,6 +211,8 @@ function getDefaultRetryMaxInterval(baseInterval: string): string {
|
||||
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
|
||||
}
|
||||
|
||||
const BOOTSTRAP_CONFIG_KEY = 'grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config';
|
||||
|
||||
const RETRY_CODES: {[key: string]: status} = {
|
||||
'cancelled': status.CANCELLED,
|
||||
'deadline-exceeded': status.DEADLINE_EXCEEDED,
|
||||
@ -238,11 +241,20 @@ class XdsResolver implements Resolver {
|
||||
|
||||
private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = [];
|
||||
|
||||
private xdsClient: XdsClient;
|
||||
|
||||
constructor(
|
||||
private target: GrpcUri,
|
||||
private listener: ResolverListener,
|
||||
private channelOptions: ChannelOptions
|
||||
) {
|
||||
if (channelOptions[BOOTSTRAP_CONFIG_KEY]) {
|
||||
const parsedConfig = JSON.parse(channelOptions[BOOTSTRAP_CONFIG_KEY]);
|
||||
const validatedConfig = validateBootstrapConfig(parsedConfig);
|
||||
this.xdsClient = new XdsClient(validatedConfig);
|
||||
} else {
|
||||
this.xdsClient = getSingletonXdsClient();
|
||||
}
|
||||
this.ldsWatcher = {
|
||||
onValidUpdate: (update: Listener__Output) => {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value);
|
||||
@ -267,16 +279,16 @@ class XdsResolver implements Resolver {
|
||||
const routeConfigName = httpConnectionManager.rds!.route_config_name;
|
||||
if (this.latestRouteConfigName !== routeConfigName) {
|
||||
if (this.latestRouteConfigName !== null) {
|
||||
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
}
|
||||
getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
|
||||
this.xdsClient.addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
|
||||
this.latestRouteConfigName = routeConfigName;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'route_config':
|
||||
if (this.latestRouteConfigName) {
|
||||
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
}
|
||||
this.handleRouteConfig(httpConnectionManager.route_config!);
|
||||
break;
|
||||
@ -546,7 +558,7 @@ class XdsResolver implements Resolver {
|
||||
methodConfig: [],
|
||||
loadBalancingConfig: [lbPolicyConfig]
|
||||
}
|
||||
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {});
|
||||
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {xdsClient: this.xdsClient});
|
||||
}
|
||||
|
||||
private reportResolutionError(reason: string) {
|
||||
@ -563,15 +575,15 @@ class XdsResolver implements Resolver {
|
||||
// Wait until updateResolution is called once to start the xDS requests
|
||||
if (!this.isLdsWatcherActive) {
|
||||
trace('Starting resolution for target ' + uriToString(this.target));
|
||||
getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher);
|
||||
this.xdsClient.addListenerWatcher(this.target.path, this.ldsWatcher);
|
||||
this.isLdsWatcherActive = true;
|
||||
}
|
||||
}
|
||||
|
||||
destroy() {
|
||||
getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher);
|
||||
this.xdsClient.removeListenerWatcher(this.target.path, this.ldsWatcher);
|
||||
if (this.latestRouteConfigName) {
|
||||
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -231,7 +231,7 @@ function validateNode(obj: any): Node {
|
||||
return result;
|
||||
}
|
||||
|
||||
function validateBootstrapFile(obj: any): BootstrapInfo {
|
||||
export function validateBootstrapConfig(obj: any): BootstrapInfo {
|
||||
return {
|
||||
xdsServers: obj.xds_servers.map(validateXdsServerConfig),
|
||||
node: validateNode(obj.node),
|
||||
@ -265,7 +265,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
||||
}
|
||||
try {
|
||||
const parsedFile = JSON.parse(data);
|
||||
resolve(validateBootstrapFile(parsedFile));
|
||||
resolve(validateBootstrapConfig(parsedFile));
|
||||
} catch (e) {
|
||||
reject(
|
||||
new Error(
|
||||
@ -290,7 +290,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
||||
if (bootstrapConfig) {
|
||||
try {
|
||||
const parsedConfig = JSON.parse(bootstrapConfig);
|
||||
const loadedBootstrapInfoValue = validateBootstrapFile(parsedConfig);
|
||||
const loadedBootstrapInfoValue = validateBootstrapConfig(parsedConfig);
|
||||
loadedBootstrapInfo = Promise.resolve(loadedBootstrapInfoValue);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
|
||||
@ -21,7 +21,7 @@ import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
||||
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
|
||||
import * as adsTypes from './generated/ads';
|
||||
import * as lrsTypes from './generated/lrs';
|
||||
import { loadBootstrapInfo } from './xds-bootstrap';
|
||||
import { BootstrapInfo, loadBootstrapInfo } from './xds-bootstrap';
|
||||
import { Node } from './generated/envoy/config/core/v3/Node';
|
||||
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
|
||||
import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
|
||||
@ -276,7 +276,7 @@ export class XdsClient {
|
||||
private adsBackoff: BackoffTimeout;
|
||||
private lrsBackoff: BackoffTimeout;
|
||||
|
||||
constructor() {
|
||||
constructor(bootstrapInfoOverride?: BootstrapInfo) {
|
||||
const edsState = new EdsState(() => {
|
||||
this.updateNames('eds');
|
||||
});
|
||||
@ -310,7 +310,15 @@ export class XdsClient {
|
||||
});
|
||||
this.lrsBackoff.unref();
|
||||
|
||||
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(
|
||||
async function getBootstrapInfo(): Promise<BootstrapInfo> {
|
||||
if (bootstrapInfoOverride) {
|
||||
return bootstrapInfoOverride;
|
||||
} else {
|
||||
return loadBootstrapInfo();
|
||||
}
|
||||
}
|
||||
|
||||
Promise.all([getBootstrapInfo(), loadAdsProtos()]).then(
|
||||
([bootstrapInfo, protoDefinitions]) => {
|
||||
if (this.hasShutdown) {
|
||||
return;
|
||||
|
||||
@ -61,10 +61,12 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
|
||||
const priorityTotalWeights: Map<number, number> = new Map();
|
||||
for (const endpoint of message.endpoints) {
|
||||
if (!endpoint.locality) {
|
||||
trace('EDS validation: endpoint locality unset');
|
||||
return false;
|
||||
}
|
||||
for (const {locality, priority} of seenLocalities) {
|
||||
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
|
||||
trace('EDS validation: endpoint locality duplicated: ' + JSON.stringify(locality) + ', priority=' + priority);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -72,16 +74,20 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
|
||||
for (const lb of endpoint.lb_endpoints) {
|
||||
const socketAddress = lb.endpoint?.address?.socket_address;
|
||||
if (!socketAddress) {
|
||||
trace('EDS validation: endpoint socket_address not set');
|
||||
return false;
|
||||
}
|
||||
if (socketAddress.port_specifier !== 'port_value') {
|
||||
trace('EDS validation: socket_address.port_specifier !== "port_value"');
|
||||
return false;
|
||||
}
|
||||
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
|
||||
trace('EDS validation: address not a valid IPv4 or IPv6 address: ' + socketAddress.address);
|
||||
return false;
|
||||
}
|
||||
for (const address of seenAddresses) {
|
||||
if (addressesEqual(socketAddress, address)) {
|
||||
trace('EDS validation: duplicate address seen: ' + address);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -91,11 +97,13 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
|
||||
}
|
||||
for (const totalWeight of priorityTotalWeights.values()) {
|
||||
if (totalWeight > UINT32_MAX) {
|
||||
trace('EDS validation: total weight > UINT32_MAX')
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (const priority of priorityTotalWeights.keys()) {
|
||||
if (priority > 0 && !priorityTotalWeights.has(priority - 1)) {
|
||||
trace('EDS validation: priorities not contiguous');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user