grpc-js-xds: Add xDS v3 support to the client

Add xDS v3 test job
This commit is contained in:
Michael Lumish 2021-04-14 14:02:10 -07:00
parent f3b6eb1c85
commit 6711620c1a
14 changed files with 661 additions and 306 deletions

View File

@ -0,0 +1,16 @@
#!/bin/bash
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
XDS_V3_OPT="--xds_v3_support" $(dirname $0)/xds.sh

1
packages/grpc-js-xds/scripts/xds.sh Normal file → Executable file
View File

@ -58,6 +58,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
--gcp_suffix=$(date '+%s') \
--verbose \
${XDS_V3_OPT-} \
--client_cmd="$(which node) grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
--server=xds:///{server_uri} \
--stats_port={stats_port} \

View File

@ -17,7 +17,7 @@
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;

View File

@ -17,7 +17,7 @@
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
import LoadBalancer = experimental.LoadBalancer;

View File

@ -26,20 +26,20 @@ import ResolverListener = experimental.ResolverListener;
import uriToString = experimental.uriToString;
import ServiceConfig = experimental.ServiceConfig;
import registerResolver = experimental.registerResolver;
import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { Listener__Output } from './generated/envoy/config/listener/v3/Listener';
import { Watcher } from './xds-stream-state/xds-stream-state';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { VirtualHost__Output } from './generated/envoy/api/v2/route/VirtualHost';
import { RouteMatch__Output } from './generated/envoy/api/v2/route/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/api/v2/route/HeaderMatcher';
import { VirtualHost__Output } from './generated/envoy/config/route/v3/VirtualHost';
import { RouteMatch__Output } from './generated/envoy/config/route/v3/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderMatcher';
import ConfigSelector = experimental.ConfigSelector;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluster-manager';
import { ExactValueMatcher, Fraction, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { LogVerbosity } from '@grpc/grpc-js/build/src/constants';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources';
const TRACER_NAME = 'xds_resolver';
@ -210,9 +210,7 @@ class XdsResolver implements Resolver {
) {
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
const httpConnectionManager = update.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value);
switch (httpConnectionManager.route_specifier) {
case 'rds': {
const routeConfigName = httpConnectionManager.rds!.route_config_name;

View File

@ -0,0 +1,96 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This is a non-public, unstable API, but it's very convenient
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
import { Listener__Output } from './generated/envoy/config/listener/v3/Listener';
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
export const EDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
export const CDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Cluster';
export const LDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.Listener';
export const RDS_TYPE_URL_V2 = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
export const EDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
export const CDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
export const LDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.listener.v3.Listener';
export const RDS_TYPE_URL_V3 = 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
export type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment' | 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
export type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster' | 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
export type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener' | 'type.googleapis.com/envoy.config.listener.v3.Listener';
export type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration' | 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration';
export type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl;
export const HTTP_CONNECTION_MANGER_TYPE_URL_V2 =
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
export const HTTP_CONNECTION_MANGER_TYPE_URL_V3 =
'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager';
export type HttpConnectionManagerTypeUrl = 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager' | 'type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager';
/**
* Map type URLs to their corresponding message types
*/
export type AdsOutputType<T extends AdsTypeUrl | HttpConnectionManagerTypeUrl> = T extends EdsTypeUrl
? ClusterLoadAssignment__Output
: T extends CdsTypeUrl
? Cluster__Output
: T extends RdsTypeUrl
? RouteConfiguration__Output
: T extends LdsTypeUrl
? Listener__Output
: HttpConnectionManager__Output;
const resourceRoot = loadProtosWithOptionsSync([
'envoy/config/listener/v3/listener.proto',
'envoy/config/route/v3/route.proto',
'envoy/config/cluster/v3/cluster.proto',
'envoy/config/endpoint/v3/endpoint.proto',
'envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto'], {
keepCase: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/udpa/',
__dirname + '/../../deps/googleapis/',
__dirname + '/../../deps/protoc-gen-validate/',
],
}
);
const toObjectOptions = {
longs: String,
enums: String,
defaults: true,
oneofs: true
}
export function decodeSingleResource<T extends AdsTypeUrl | HttpConnectionManagerTypeUrl>(targetTypeUrl: T, message: Buffer): AdsOutputType<T> {
const name = targetTypeUrl.substring(targetTypeUrl.lastIndexOf('/') + 1);
const type = resourceRoot.lookup(name);
if (type) {
const decodedMessage = (type as any).decode(message);
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as AdsOutputType<T>;
} else {
throw new Error(`ADS Error: unknown resource type ${targetTypeUrl}`);
}
}

View File

@ -17,11 +17,23 @@
import * as fs from 'fs';
import { Struct } from './generated/google/protobuf/Struct';
import { Node } from './generated/envoy/api/v2/core/Node';
import { Value } from './generated/google/protobuf/Value';
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface Locality {
region?: string;
zone?: string;
sub_zone?: string;
}
export interface Node {
id: string,
locality: Locality;
cluster?: string;
metadata?: Struct;
}
export interface ChannelCredsConfig {
type: string;
config?: object;
@ -30,6 +42,7 @@ export interface ChannelCredsConfig {
export interface XdsServerConfig {
serverUri: string;
channelCreds: ChannelCredsConfig[];
serverFeatures: string[];
}
export interface BootstrapInfo {
@ -81,9 +94,22 @@ function validateXdsServerConfig(obj: any): XdsServerConfig {
'xds_servers.channel_creds field: at least one entry is required'
);
}
if ('server_features' in obj) {
if (!Array.isArray(obj.server_features)) {
throw new Error(
`xds_servers.server_features field: expected array, got ${typeof obj.server_features}`
);
}
for (const feature of obj.server_features) {
if (typeof feature !== 'string') {
`xds_servers.server_features field element: expected string, got ${typeof feature}`
}
}
}
return {
serverUri: obj.server_uri,
channelCreds: obj.channel_creds.map(validateChannelCredsConfig),
serverFeatures: obj.server_features
};
}
@ -149,7 +175,10 @@ function getStructFromJson(obj: any): Struct {
* @param obj
*/
function validateNode(obj: any): Node {
const result: Node = {};
const result: Node = {
id: '',
locality: {}
};
if (!('id' in obj)) {
throw new Error('id field missing in node element');
}

View File

@ -16,35 +16,26 @@
*/
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials } from '@grpc/grpc-js';
// This is a non-public, unstable API, but it's very convenient
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel } from '@grpc/grpc-js';
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { loadBootstrapInfo } from './xds-bootstrap';
import { isIPv4, isIPv6 } from 'net';
import { Node } from './generated/envoy/api/v2/core/Node';
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v2/AggregatedDiscoveryService';
import { DiscoveryRequest } from './generated/envoy/api/v2/DiscoveryRequest';
import { DiscoveryResponse__Output } from './generated/envoy/api/v2/DiscoveryResponse';
import {
ClusterLoadAssignment__Output,
ClusterLoadAssignment,
} from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService';
import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v2/LoadStatsResponse';
import {
Locality__Output,
Locality,
} from './generated/envoy/api/v2/core/Locality';
import {
ClusterStats,
_envoy_api_v2_endpoint_ClusterStats_DroppedRequests,
} from './generated/envoy/api/v2/endpoint/ClusterStats';
import { UpstreamLocalityStats } from './generated/envoy/api/v2/endpoint/UpstreamLocalityStats';
import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { Node as NodeV2 } from './generated/envoy/api/v2/core/Node';
import { Node as NodeV3 } from './generated/envoy/config/core/v3/Node';
import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV2 } from './generated/envoy/service/discovery/v2/AggregatedDiscoveryService';
import { AggregatedDiscoveryServiceClient as AggregatedDiscoveryServiceClientV3 } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
import { DiscoveryRequest as DiscoveryRequestV2 } from './generated/envoy/api/v2/DiscoveryRequest';
import { DiscoveryRequest as DiscoveryRequestV3 } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
import { DiscoveryResponse__Output } from './generated/envoy/service/discovery/v3/DiscoveryResponse';
import { LoadReportingServiceClient as LoadReportingServiceClientV2 } from './generated/envoy/service/load_stats/v2/LoadReportingService';
import { LoadReportingServiceClient as LoadReportingServiceClientV3 } from './generated/envoy/service/load_stats/v3/LoadReportingService';
import { LoadStatsRequest as LoadStatsRequestV2 } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsRequest as LoadStatsRequestV3 } from './generated/envoy/service/load_stats/v3/LoadStatsRequest';
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v3/LoadStatsResponse';
import { Locality, Locality__Output } from './generated/envoy/config/core/v3/Locality';
import { Listener__Output } from './generated/envoy/config/listener/v3/Listener';
import { Any__Output } from './generated/google/protobuf/Any';
import BackoffTimeout = experimental.BackoffTimeout;
import ServiceConfig = experimental.ServiceConfig;
@ -55,6 +46,11 @@ import { CdsState } from './xds-stream-state/cds-state';
import { RdsState } from './xds-stream-state/rds-state';
import { LdsState } from './xds-stream-state/lds-state';
import { Watcher } from './xds-stream-state/xds-stream-state';
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
import { Duration } from './generated/google/protobuf/Duration';
import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, decodeSingleResource, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from './resources';
const TRACER_NAME = 'xds_client';
@ -64,21 +60,6 @@ function trace(text: string): void {
const clientVersion = require('../../package.json').version;
const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster';
const LDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Listener';
const RDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster';
type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener';
type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl;
const HTTP_CONNECTION_MANGER_TYPE_URL =
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
let loadedProtos: Promise<
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
> | null = null;
@ -94,11 +75,8 @@ function loadAdsProtos(): Promise<
[
'envoy/service/discovery/v2/ads.proto',
'envoy/service/load_stats/v2/lrs.proto',
'envoy/api/v2/listener.proto',
'envoy/api/v2/route.proto',
'envoy/api/v2/cluster.proto',
'envoy/api/v2/endpoint.proto',
'envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto',
'envoy/service/discovery/v3/ads.proto',
'envoy/service/load_stats/v3/lrs.proto',
],
{
keepCase: true,
@ -106,7 +84,6 @@ function loadAdsProtos(): Promise<
enums: String,
defaults: true,
oneofs: true,
json: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
@ -145,6 +122,32 @@ export interface XdsClusterLocalityStats {
addCallFinished(fail: boolean): void;
}
interface DroppedRequests {
category: string;
dropped_count: number;
}
interface UpstreamLocalityStats {
locality: Locality;
total_issued_requests: number;
total_successful_requests: number;
total_error_requests: number;
total_requests_in_progress: number;
}
/**
* An interface representing the ClusterStats message type, restricted to the
* fields used in this module to ensure compatibility with both v2 and v3 APIs.
*/
interface ClusterStats {
cluster_name: string;
cluster_service_name: string;
dropped_requests: DroppedRequests[];
total_dropped_requests: number;
upstream_locality_stats: UpstreamLocalityStats[];
load_report_interval: Duration
}
interface ClusterLocalityStats {
locality: Locality__Output;
callsStarted: number;
@ -218,39 +221,32 @@ class ClusterLoadReportMap {
}
}
type AdsServiceKind = 'eds' | 'cds' | 'rds' | 'lds';
interface AdsState {
[EDS_TYPE_URL]: EdsState;
[CDS_TYPE_URL]: CdsState;
[RDS_TYPE_URL]: RdsState;
[LDS_TYPE_URL]: LdsState;
eds: EdsState;
cds: CdsState;
rds: RdsState;
lds: LdsState;
}
/**
* Map type URLs to their corresponding message types
*/
type OutputType<T extends AdsTypeUrl> = T extends EdsTypeUrl
? ClusterLoadAssignment__Output
: T extends CdsTypeUrl
? Cluster__Output
: T extends RdsTypeUrl
? RouteConfiguration__Output
: Listener__Output;
enum XdsApiVersion {
V2,
V3
}
function getResponseMessages<T extends AdsTypeUrl>(
typeUrl: T,
targetTypeUrl: T,
allowedTypeUrls: string[],
resources: Any__Output[]
): OutputType<T>[] {
const result: OutputType<T>[] = [];
): AdsOutputType<T>[] {
const result: AdsOutputType<T>[] = [];
for (const resource of resources) {
if (protoLoader.isAnyExtension(resource) && resource['@type'] === typeUrl) {
result.push(resource as protoLoader.AnyExtension & OutputType<T>);
if (allowedTypeUrls.includes(resource.type_url)) {
result.push(decodeSingleResource(targetTypeUrl, resource.value));
} else {
throw new Error(
`ADS Error: Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}, expected ${typeUrl}`
`ADS Error: Invalid resource type ${resource.type_url}, expected ${allowedTypeUrls}`
);
}
}
@ -258,20 +254,39 @@ function getResponseMessages<T extends AdsTypeUrl>(
}
export class XdsClient {
private adsNode: Node | null = null;
private adsClient: AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<
DiscoveryRequest,
private apiVersion: XdsApiVersion = XdsApiVersion.V2;
private adsNodeV2: NodeV2 | null = null;
private adsNodeV3: NodeV3 | null = null;
/* A client initiates connections lazily, so the client we don't use won't
* use significant extra resources. */
private adsClientV2: AggregatedDiscoveryServiceClientV2 | null = null;
private adsClientV3: AggregatedDiscoveryServiceClientV3 | null = null;
/* TypeScript typing is structural, so we can take advantage of the fact that
* the output structures for the two call types are identical. */
private adsCallV2: ClientDuplexStream<
DiscoveryRequestV2,
DiscoveryResponse__Output
> | null = null;
private adsCallV3: ClientDuplexStream<
DiscoveryRequestV3,
DiscoveryResponse__Output
> | null = null;
private lrsNode: Node | null = null;
private lrsClient: LoadReportingServiceClient | null = null;
private lrsCall: ClientDuplexStream<
LoadStatsRequest,
private lrsNodeV2: NodeV2 | null = null;
private lrsNodeV3: NodeV3 | null = null;
private lrsClientV2: LoadReportingServiceClientV2 | null = null;
private lrsClientV3: LoadReportingServiceClientV3 | null = null;
private lrsCallV2: ClientDuplexStream<
LoadStatsRequestV2,
LoadStatsResponse__Output
> | null = null;
private lrsCallV3: ClientDuplexStream<
LoadStatsRequestV3,
LoadStatsResponse__Output
> | null = null;
private latestLrsSettings: LoadStatsResponse__Output | null = null;
private receivedLrsSettingsForCurrentStream = false;
private clusterStatsMap: ClusterLoadReportMap = new ClusterLoadReportMap();
private statsTimer: NodeJS.Timer;
@ -285,22 +300,22 @@ export class XdsClient {
constructor() {
const edsState = new EdsState(() => {
this.updateNames(EDS_TYPE_URL);
this.updateNames('eds');
});
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
this.updateNames('cds');
});
const rdsState = new RdsState(() => {
this.updateNames(RDS_TYPE_URL);
this.updateNames('rds');
});
const ldsState = new LdsState(rdsState, () => {
this.updateNames(LDS_TYPE_URL);
this.updateNames('lds');
});
this.adsState = {
[EDS_TYPE_URL]: edsState,
[CDS_TYPE_URL]: cdsState,
[RDS_TYPE_URL]: rdsState,
[LDS_TYPE_URL]: ldsState,
eds: edsState,
cds: cdsState,
rds: rdsState,
lds: ldsState,
};
const channelArgs = {
@ -322,17 +337,34 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
const node: Node = {
if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('xds_v3') >= 0) {
this.apiVersion = XdsApiVersion.V3;
} else {
this.apiVersion = XdsApiVersion.V2;
}
const nodeV2: NodeV2 = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS',
};
this.adsNode = {
...node,
const nodeV3: NodeV3 = {
...bootstrapInfo.node,
user_agent_name: 'gRPC Node Pure JS',
};
this.adsNodeV2 = {
...nodeV2,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...node,
this.adsNodeV3 = {
...nodeV3,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNodeV2 = {
...nodeV2,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
this.lrsNodeV3 = {
...nodeV3,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
const credentialsConfigs = bootstrapInfo.xdsServers[0].channelCreds;
@ -356,18 +388,30 @@ export class XdsClient {
});
return;
}
const serverUri = bootstrapInfo.xdsServers[0].serverUri
trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri);
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,
const channel = new Channel(serverUri, channelCreds, channelArgs);
this.adsClientV2 = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
serverUri,
channelCreds,
channelArgs
{channelOverride: channel}
);
this.adsClientV3 = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
serverUri,
channelCreds,
{channelOverride: channel}
);
this.maybeStartAdsStream();
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
bootstrapInfo.xdsServers[0].serverUri,
this.lrsClientV2 = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
serverUri,
channelCreds,
{channelOverride: this.adsClient.getChannel()}
{channelOverride: channel}
);
this.lrsClientV3 = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
serverUri,
channelCreds,
{channelOverride: channel}
);
this.maybeStartLrsStream();
},
@ -387,32 +431,40 @@ export class XdsClient {
private handleAdsResponse(message: DiscoveryResponse__Output) {
let errorString: string | null;
/* The cases in this switch statement look redundant but separating them
* out like this is necessary for the typechecker to validate the types
* as narrowly as we need it to. */
let serviceKind: AdsServiceKind;
switch (message.type_url) {
case EDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
case EDS_TYPE_URL_V2:
case EDS_TYPE_URL_V3:
errorString = this.adsState.eds.handleResponses(
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources)
);
serviceKind = 'eds';
break;
case CDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
case CDS_TYPE_URL_V2:
case CDS_TYPE_URL_V3:
errorString = this.adsState.cds.handleResponses(
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources)
);
serviceKind = 'cds';
break;
case RDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
case RDS_TYPE_URL_V2:
case RDS_TYPE_URL_V3:
errorString = this.adsState.rds.handleResponses(
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources)
);
serviceKind = 'rds';
break;
case LDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
case LDS_TYPE_URL_V2:
case LDS_TYPE_URL_V3:
errorString = this.adsState.lds.handleResponses(
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources)
);
serviceKind = 'lds';
break;
default:
errorString = `Unknown type_url ${message.type_url}`;
// This is not used in this branch, but setting it makes the types easier to handle
serviceKind = 'eds';
}
if (errorString === null) {
trace('Acking message with type URL ' + message.type_url);
@ -420,65 +472,148 @@ export class XdsClient {
* implies that message.type_url is one of the 4 known type URLs, which
* means that this type assertion is valid. */
const typeUrl = message.type_url as AdsTypeUrl;
this.adsState[typeUrl].nonce = message.nonce;
this.adsState[typeUrl].versionInfo = message.version_info;
this.ack(typeUrl);
this.adsState[serviceKind].nonce = message.nonce;
this.adsState[serviceKind].versionInfo = message.version_info;
this.ack(serviceKind);
} else {
trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"');
this.nack(message.type_url, errorString);
}
}
private handleAdsCallError(error: ServiceError) {
trace(
'ADS stream ended. code=' + error.code + ' details= ' + error.details
);
this.adsCallV2 = null;
this.adsCallV3 = null;
this.reportStreamError(error);
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.adsBackoff.isRunning()) {
this.maybeStartAdsStream();
}
}
private maybeStartAdsStreamV2(): boolean {
if (this.apiVersion !== XdsApiVersion.V2) {
return false;
}
if (this.adsClientV2 === null) {
return false;
}
if (this.adsCallV2 !== null) {
return false;
}
this.adsCallV2 = this.adsClientV2.StreamAggregatedResources();
this.adsCallV2.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
this.adsCallV2.on('error', (error: ServiceError) => {
this.handleAdsCallError(error);
});
return true;
}
private maybeStartAdsStreamV3(): boolean {
if (this.apiVersion !== XdsApiVersion.V3) {
return false;
}
if (this.adsClientV3 === null) {
return false;
}
if (this.adsCallV3 !== null) {
return false;
}
this.adsCallV3 = this.adsClientV3.StreamAggregatedResources();
this.adsCallV3.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
this.adsCallV3.on('error', (error: ServiceError) => {
this.handleAdsCallError(error);
});
return true;
}
/**
* Start the ADS stream if the client exists and there is not already an
* existing stream, and there
* existing stream, and there are resources to request.
*/
private maybeStartAdsStream() {
if (this.adsClient === null) {
return;
}
if (this.adsCall !== null) {
return;
}
if (this.hasShutdown) {
return;
}
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
if (this.adsState.eds.getResourceNames().length === 0 &&
this.adsState.cds.getResourceNames().length === 0 &&
this.adsState.rds.getResourceNames().length === 0 &&
this.adsState.lds.getResourceNames().length === 0) {
return;
}
trace('Starting ADS stream');
// Backoff relative to when we start the request
this.adsBackoff.runOnce();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
this.adsCall.on('error', (error: ServiceError) => {
trace(
'ADS stream ended. code=' + error.code + ' details= ' + error.details
);
this.adsCall = null;
this.reportStreamError(error);
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.adsBackoff.isRunning()) {
this.maybeStartAdsStream();
}
});
let streamStarted: boolean;
if (this.apiVersion === XdsApiVersion.V2) {
streamStarted = this.maybeStartAdsStreamV2();
} else {
streamStarted = this.maybeStartAdsStreamV3();
}
if (streamStarted) {
trace('Started ADS stream');
// Backoff relative to when we start the request
this.adsBackoff.runOnce();
const allTypeUrls: AdsTypeUrl[] = [
EDS_TYPE_URL,
CDS_TYPE_URL,
RDS_TYPE_URL,
LDS_TYPE_URL,
];
for (const typeUrl of allTypeUrls) {
const state = this.adsState[typeUrl];
if (state.getResourceNames().length > 0) {
this.updateNames(typeUrl);
const allServiceKinds: AdsServiceKind[] = ['eds', 'cds', 'rds', 'lds'];
for (const service of allServiceKinds) {
const state = this.adsState[service];
if (state.getResourceNames().length > 0) {
this.updateNames(service);
}
}
}
}
private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) {
if (this.apiVersion === XdsApiVersion.V2) {
this.adsCallV2?.write({
node: this.adsNodeV2!,
type_url: typeUrl,
resource_names: resourceNames,
response_nonce: responseNonce,
version_info: versionInfo,
error_detail: errorMessage ? { message: errorMessage } : undefined
});
} else {
this.adsCallV3?.write({
node: this.adsNodeV3!,
type_url: typeUrl,
resource_names: resourceNames,
response_nonce: responseNonce,
version_info: versionInfo,
error_detail: errorMessage ? { message: errorMessage } : undefined
});
}
}
private getTypeUrl(serviceKind: AdsServiceKind): AdsTypeUrl {
if (this.apiVersion === XdsApiVersion.V2) {
switch (serviceKind) {
case 'eds':
return EDS_TYPE_URL_V2;
case 'cds':
return CDS_TYPE_URL_V2;
case 'rds':
return RDS_TYPE_URL_V2;
case 'lds':
return LDS_TYPE_URL_V2;
}
} else {
switch (serviceKind) {
case 'eds':
return EDS_TYPE_URL_V3;
case 'cds':
return CDS_TYPE_URL_V3;
case 'rds':
return RDS_TYPE_URL_V3;
case 'lds':
return LDS_TYPE_URL_V3;
}
}
}
@ -487,13 +622,13 @@ export class XdsClient {
* Acknowledge an update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
ack(typeUrl: AdsTypeUrl) {
ack(serviceKind: AdsServiceKind) {
/* An ack is the best indication of a successful interaction between the
* client and the server, so we can reset the backoff timer here. */
this.adsBackoff.stop();
this.adsBackoff.reset();
this.updateNames(typeUrl);
this.updateNames(serviceKind);
}
/**
@ -504,135 +639,196 @@ export class XdsClient {
let resourceNames: string[];
let nonce: string;
let versionInfo: string;
let serviceKind: AdsServiceKind | null;
switch (typeUrl) {
case EDS_TYPE_URL:
case CDS_TYPE_URL:
case RDS_TYPE_URL:
case LDS_TYPE_URL:
resourceNames = this.adsState[typeUrl].getResourceNames();
nonce = this.adsState[typeUrl].nonce;
versionInfo = this.adsState[typeUrl].versionInfo;
case EDS_TYPE_URL_V2:
case EDS_TYPE_URL_V3:
serviceKind = 'eds';
break;
case CDS_TYPE_URL_V2:
case CDS_TYPE_URL_V3:
serviceKind = 'cds';
break;
case RDS_TYPE_URL_V2:
case RDS_TYPE_URL_V3:
serviceKind = 'rds';
break;
case LDS_TYPE_URL_V2:
case LDS_TYPE_URL_V3:
serviceKind = 'lds';
break;
default:
resourceNames = [];
nonce = '';
versionInfo = '';
serviceKind = null;
break;
}
this.adsCall?.write({
node: this.adsNode!,
type_url: typeUrl,
resource_names: resourceNames,
response_nonce: nonce,
version_info: versionInfo,
error_detail: {
message: message,
},
});
if (serviceKind) {
resourceNames = this.adsState[serviceKind].getResourceNames();
nonce = this.adsState[serviceKind].nonce;
versionInfo = this.adsState[serviceKind].versionInfo;
} else {
resourceNames = [];
nonce = '';
versionInfo = '';
}
this.maybeSendAdsMessage(typeUrl, resourceNames, nonce, versionInfo, message);
}
private updateNames(typeUrl: AdsTypeUrl) {
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
this.adsCall?.end();
this.lrsCall?.end();
private updateNames(serviceKind: AdsServiceKind) {
if (this.adsState.eds.getResourceNames().length === 0 &&
this.adsState.cds.getResourceNames().length === 0 &&
this.adsState.rds.getResourceNames().length === 0 &&
this.adsState.lds.getResourceNames().length === 0) {
this.adsCallV2?.end();
this.adsCallV2 = null;
this.adsCallV3?.end();
this.adsCallV3 = null;
this.lrsCallV2?.end();
this.lrsCallV2 = null;
this.lrsCallV3?.end();
this.lrsCallV3 = null;
return;
}
this.maybeStartAdsStream();
this.maybeStartLrsStream();
trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames());
this.adsCall?.write({
node: this.adsNode!,
type_url: typeUrl,
resource_names: this.adsState[typeUrl].getResourceNames(),
response_nonce: this.adsState[typeUrl].nonce,
version_info: this.adsState[typeUrl].versionInfo,
});
trace('Sending update for ' + serviceKind + ' with names ' + this.adsState[serviceKind].getResourceNames());
const typeUrl = this.getTypeUrl(serviceKind);
this.maybeSendAdsMessage(typeUrl, this.adsState[serviceKind].getResourceNames(), this.adsState[serviceKind].nonce, this.adsState[serviceKind].versionInfo);
}
private reportStreamError(status: StatusObject) {
this.adsState[EDS_TYPE_URL].reportStreamError(status);
this.adsState[CDS_TYPE_URL].reportStreamError(status);
this.adsState[RDS_TYPE_URL].reportStreamError(status);
this.adsState[LDS_TYPE_URL].reportStreamError(status);
this.adsState.eds.reportStreamError(status);
this.adsState.cds.reportStreamError(status);
this.adsState.rds.reportStreamError(status);
this.adsState.lds.reportStreamError(status);
}
private handleLrsResponse(message: LoadStatsResponse__Output) {
trace('Received LRS response');
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
if (
!this.receivedLrsSettingsForCurrentStream ||
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
message.load_reporting_interval?.nanos !==
this.latestLrsSettings?.load_reporting_interval?.nanos
) {
/* Only reset the timer if the interval has changed or was not set
* before. */
clearInterval(this.statsTimer);
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
trace('Received LRS response with load reporting interval ' + loadReportingIntervalMs + ' ms');
this.statsTimer = setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.latestLrsSettings = message;
this.receivedLrsSettingsForCurrentStream = true;
}
private handleLrsCallError(error: ServiceError) {
trace(
'LRS stream ended. code=' + error.code + ' details= ' + error.details
);
this.lrsCallV2 = null;
this.lrsCallV3 = null;
clearInterval(this.statsTimer);
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.lrsBackoff.isRunning()) {
this.maybeStartLrsStream();
}
}
private maybeStartLrsStreamV2(): boolean {
if (!this.lrsClientV2) {
return false;
}
if (this.lrsCallV2) {
return false;
}
this.lrsCallV2 = this.lrsClientV2.streamLoadStats();
this.receivedLrsSettingsForCurrentStream = false;
this.lrsCallV2.on('data', (message: LoadStatsResponse__Output) => {
this.handleLrsResponse(message);
});
this.lrsCallV2.on('error', (error: ServiceError) => {
this.handleLrsCallError(error);
});
return true;
}
private maybeStartLrsStreamV3(): boolean {
if (!this.lrsClientV3) {
return false;
}
if (this.lrsCallV3) {
return false;
}
this.lrsCallV3 = this.lrsClientV3.streamLoadStats();
this.receivedLrsSettingsForCurrentStream = false;
this.lrsCallV3.on('data', (message: LoadStatsResponse__Output) => {
this.handleLrsResponse(message);
});
this.lrsCallV3.on('error', (error: ServiceError) => {
this.handleLrsCallError(error);
});
return true;
}
private maybeStartLrsStream() {
if (!this.lrsClient) {
return;
}
if (this.lrsCall) {
return;
}
if (this.hasShutdown) {
return;
}
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
if (this.adsState.eds.getResourceNames().length === 0 &&
this.adsState.cds.getResourceNames().length === 0 &&
this.adsState.rds.getResourceNames().length === 0 &&
this.adsState.lds.getResourceNames().length === 0) {
return;
}
trace('Starting LRS stream');
this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
let receivedSettingsForThisStream = false;
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
if (
!receivedSettingsForThisStream ||
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
message.load_reporting_interval?.nanos !==
this.latestLrsSettings?.load_reporting_interval?.nanos
) {
/* Only reset the timer if the interval has changed or was not set
* before. */
clearInterval(this.statsTimer);
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
trace('Received LRS request with load reporting interval ' + loadReportingIntervalMs + ' ms');
this.statsTimer = setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.latestLrsSettings = message;
receivedSettingsForThisStream = true;
});
this.lrsCall.on('error', (error: ServiceError) => {
trace(
'LRS stream ended. code=' + error.code + ' details= ' + error.details
);
this.lrsCall = null;
clearInterval(this.statsTimer);
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.lrsBackoff.isRunning()) {
this.maybeStartLrsStream();
}
});
/* Send buffered stats information when starting LRS stream. If there is no
* buffered stats information, it will still send the node field. */
this.sendStats();
let streamStarted: boolean;
if (this.apiVersion === XdsApiVersion.V2) {
streamStarted = this.maybeStartLrsStreamV2();
} else {
streamStarted = this.maybeStartLrsStreamV3();
}
if (streamStarted) {
trace('Starting LRS stream');
this.lrsBackoff.runOnce();
/* Send buffered stats information when starting LRS stream. If there is no
* buffered stats information, it will still send the node field. */
this.sendStats();
}
}
private maybeSendLrsMessage(clusterStats: ClusterStats[]) {
if (this.apiVersion === XdsApiVersion.V2) {
this.lrsCallV2?.write({
node: this.lrsNodeV2!,
cluster_stats: clusterStats
});
} else {
this.lrsCallV3?.write({
node: this.lrsNodeV3!,
cluster_stats: clusterStats
});
}
}
private sendStats() {
if (!this.lrsCall) {
if (this.lrsCallV2 === null && this.lrsCallV3 === null) {
return;
}
if (!this.latestLrsSettings) {
this.lrsCall.write({
node: this.lrsNode!,
});
this.maybeSendLrsMessage([]);
return;
}
const clusterStats: ClusterStats[] = [];
@ -664,7 +860,7 @@ export class XdsClient {
localityStats.callsFailed = 0;
}
}
const droppedRequests: _envoy_api_v2_endpoint_ClusterStats_DroppedRequests[] = [];
const droppedRequests: DroppedRequests[] = [];
let totalDroppedRequests = 0;
for (const [category, count] of stats.callsDropped.entries()) {
if (count > 0) {
@ -696,10 +892,7 @@ export class XdsClient {
}
}
trace('Sending LRS stats ' + JSON.stringify(clusterStats, undefined, 2));
this.lrsCall.write({
node: this.lrsNode!,
cluster_stats: clusterStats,
});
this.maybeSendLrsMessage(clusterStats);
}
addEndpointWatcher(
@ -707,7 +900,7 @@ export class XdsClient {
watcher: Watcher<ClusterLoadAssignment__Output>
) {
trace('Watcher added for endpoint ' + edsServiceName);
this.adsState[EDS_TYPE_URL].addWatcher(edsServiceName, watcher);
this.adsState.eds.addWatcher(edsServiceName, watcher);
}
removeEndpointWatcher(
@ -715,37 +908,37 @@ export class XdsClient {
watcher: Watcher<ClusterLoadAssignment__Output>
) {
trace('Watcher removed for endpoint ' + edsServiceName);
this.adsState[EDS_TYPE_URL].removeWatcher(edsServiceName, watcher);
this.adsState.eds.removeWatcher(edsServiceName, watcher);
}
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher added for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].addWatcher(clusterName, watcher);
this.adsState.cds.addWatcher(clusterName, watcher);
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher removed for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
this.adsState.cds.removeWatcher(clusterName, watcher);
}
addRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher added for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].addWatcher(routeConfigName, watcher);
this.adsState.rds.addWatcher(routeConfigName, watcher);
}
removeRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher removed for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].removeWatcher(routeConfigName, watcher);
this.adsState.rds.removeWatcher(routeConfigName, watcher);
}
addListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher added for listener ' + targetName);
this.adsState[LDS_TYPE_URL].addWatcher(targetName, watcher);
this.adsState.lds.addWatcher(targetName, watcher);
}
removeListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher removed for listener ' + targetName);
this.adsState[LDS_TYPE_URL].removeWatcher(targetName, watcher);
this.adsState.lds.removeWatcher(targetName, watcher);
}
/**
@ -833,10 +1026,14 @@ export class XdsClient {
}
private shutdown(): void {
this.adsCall?.cancel();
this.adsClient?.close();
this.lrsCall?.cancel();
this.lrsClient?.close();
this.adsCallV2?.cancel();
this.adsCallV3?.cancel();
this.adsClientV2?.close();
this.adsClientV3?.close();
this.lrsCallV2?.cancel();
this.lrsCallV3?.cancel();
this.lrsClientV2?.close();
this.lrsClientV3?.close();
this.hasShutdown = true;
}
}

View File

@ -16,7 +16,7 @@
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Cluster__Output } from "../generated/envoy/api/v2/Cluster";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { EdsState } from "./eds-state";
import { Watcher, XdsStreamState } from "./xds-stream-state";

View File

@ -17,7 +17,7 @@
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { isIPv4, isIPv6 } from "net";
import { ClusterLoadAssignment__Output } from "../generated/envoy/api/v2/ClusterLoadAssignment";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Watcher, XdsStreamState } from "./xds-stream-state";
const TRACER_NAME = 'xds_client';

View File

@ -17,10 +17,11 @@
import * as protoLoader from '@grpc/proto-loader';
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Listener__Output } from "../generated/envoy/api/v2/Listener";
import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener';
import { RdsState } from "./rds-state";
import { Watcher, XdsStreamState } from "./xds-stream-state";
import { HttpConnectionManager__Output } from '../generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources';
const TRACER_NAME = 'xds_client';
@ -28,9 +29,6 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const HTTP_CONNECTION_MANGER_TYPE_URL =
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
export class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
@ -95,16 +93,13 @@ export class LdsState implements XdsStreamState<Listener__Output> {
if (
!(
message.api_listener?.api_listener &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
message.api_listener?.api_listener['@type'] ===
HTTP_CONNECTION_MANGER_TYPE_URL
(message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V2 ||
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL_V3)
)
) {
return false;
}
const httpConnectionManager = message.api_listener
?.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value);
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;

View File

@ -16,7 +16,7 @@
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { RouteConfiguration__Output } from "../generated/envoy/api/v2/RouteConfiguration";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
import { CdsLoadBalancingConfig } from "../load-balancer-cds";
import { Watcher, XdsStreamState } from "./xds-stream-state";
import ServiceConfig = experimental.ServiceConfig;

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
@ -21,4 +20,4 @@ action {
define_artifacts {
regex: "github/grpc-node/reports/**/sponge_log.xml"
}
}
}

View File

@ -0,0 +1,24 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds-v3.sh"
timeout_mins: 120
action {
define_artifacts {
regex: "github/grpc/reports/**"
}
}