From 248479bc225ca32c2e15897b6c05f88394642f28 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 20 Jul 2020 13:51:50 -0700 Subject: [PATCH 1/3] grpc-js: Implement CDS LB policy --- packages/grpc-js/src/load-balancer-cds.ts | 115 ++++++++++++++ packages/grpc-js/src/load-balancer-eds.ts | 4 +- packages/grpc-js/src/load-balancing-config.ts | 18 ++- packages/grpc-js/src/xds-client.ts | 144 +++++++++++++++++- 4 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 packages/grpc-js/src/load-balancer-cds.ts diff --git a/packages/grpc-js/src/load-balancer-cds.ts b/packages/grpc-js/src/load-balancer-cds.ts new file mode 100644 index 00000000..33fe42c7 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-cds.ts @@ -0,0 +1,115 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { LoadBalancingConfig, isCdsLoadBalancingConfig, EdsLbConfig, CdsLoadBalancingConfig } from './load-balancing-config'; +import { XdsClient, Watcher } from './xds-client'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; +import { ConnectivityState } from './channel'; +import { UnavailablePicker } from './picker'; +import { Status } from './constants'; +import { Metadata } from '.'; + +const TYPE_NAME = 'cds'; + +export class CdsLoadBalancer implements LoadBalancer { + private childBalancer: ChildLoadBalancerHandler; + private xdsClient: XdsClient | null = null; + private watcher: Watcher; + + private isWatcherActive = false; + + private latestCdsUpdate: Cluster__Output | null = null; + + private latestConfig: CdsLoadBalancingConfig | null = null; + private latestAttributes: { [key: string]: unknown } = {}; + + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); + this.watcher = { + onValidUpdate: update => { + this.latestCdsUpdate = update; + const edsConfig: EdsLbConfig = { + cluster: update.name, + edsServiceName: update.eds_cluster_config.service_name === '' ? undefined : update.eds_cluster_config.service_name, + localityPickingPolicy: [], + endpointPickingPolicy: [] + // TODO(murgatroid99): populate lrsLoadReportingServerName + } + this.childBalancer.updateAddressList([], {name: 'eds', eds: edsConfig}, this.latestAttributes); + }, + onResourceDoesNotExist: () => { + this.xdsClient?.removeClusterWatcher(this.latestConfig!.cds.cluster, this.watcher); + this.isWatcherActive = false; + }, + onTransientError: status => { + if (this.latestCdsUpdate === null) { + channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: `xDS request failed with error ${status.details}`, + metadata: new Metadata(), + }) + ); + } + } + }; + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!isCdsLoadBalancingConfig(lbConfig)) { + return; + } + if (!(attributes.xdsClient instanceof XdsClient)) { + return; + } + this.xdsClient = attributes.xdsClient; + this.latestConfig = lbConfig; + this.latestAttributes = attributes; + + if (!this.isWatcherActive) { + this.xdsClient.addClusterWatcher(lbConfig.cds.cluster, this.watcher); + this.isWatcherActive = true; + } + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + if (this.isWatcherActive) { + this.xdsClient?.removeClusterWatcher(this.latestConfig!.cds.cluster, this.watcher); + } + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, CdsLoadBalancer); +} \ No newline at end of file diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 57582849..d02754be 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -91,8 +91,8 @@ export class EdsLoadBalancer implements LoadBalancer { this.updateChild(); }, onResourceDoesNotExist: () => { - /* TODO(murgatroid99): Figure out what needs to be done here after - * implementing CDS */ + this.xdsClient?.removeEndpointWatcher(this.edsServiceName!, this.watcher); + this.isWatcherActive = false; }, onTransientError: (status) => { if (this.latestEdsUpdate === null) { diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index 164e1ab4..92c4b2b9 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -75,6 +75,10 @@ export interface EdsLbConfig { endpointPickingPolicy: LoadBalancingConfig[]; } +export interface CdsLbConfig { + cluster: string; +} + export interface PickFirstLoadBalancingConfig { name: 'pick_first'; pick_first: PickFirstConfig; @@ -110,6 +114,11 @@ export interface EdsLoadBalancingConfig { eds: EdsLbConfig; } +export interface CdsLoadBalancingConfig { + name: 'cds'; + cds: CdsLbConfig; +} + export type LoadBalancingConfig = | PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig @@ -117,7 +126,8 @@ export type LoadBalancingConfig = | GrpcLbLoadBalancingConfig | PriorityLoadBalancingConfig | WeightedTargetLoadBalancingConfig - | EdsLoadBalancingConfig; + | EdsLoadBalancingConfig + | CdsLoadBalancingConfig; export function isRoundRobinLoadBalancingConfig( lbconfig: LoadBalancingConfig @@ -155,6 +165,12 @@ export function isEdsLoadBalancingConfig( return lbconfig.name === 'eds'; } +export function isCdsLoadBalancingConfig( + lbconfig: LoadBalancingConfig +): lbconfig is CdsLoadBalancingConfig { + return lbconfig.name === 'cds'; +} + /* In these functions we assume the input came from a JSON object. Therefore we * expect that the prototype is uninteresting and that `in` can be used * effectively */ diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 98454817..c3e4e49f 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -33,6 +33,7 @@ import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/disc import { DiscoveryRequest } from './generated/envoy/api/v2/DiscoveryRequest'; import { DiscoveryResponse__Output } from './generated/envoy/api/v2/DiscoveryResponse'; import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment'; +import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; const TRACER_NAME = 'xds_client'; @@ -43,6 +44,7 @@ function trace(text: string): void { const clientVersion = require('../../package.json').version; const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; +const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster' let loadedProtos: Promise | null = null; @@ -105,6 +107,10 @@ export class XdsClient { > = new Map[]>(); private lastEdsVersionInfo = ''; private lastEdsNonce = ''; + + private clusterWatchers: Map[]> = new Map[]>(); + private lastCdsVersionInfo = ''; + private lastCdsNonce = ''; constructor( private targetName: string, @@ -209,6 +215,36 @@ export class XdsClient { this.ackEds(); break; } + case CDS_TYPE_URL: + const cdsResponses: Cluster__Output[] = []; + for (const resource of message.resources) { + if ( + protoLoader.isAnyExtension(resource) && + resource['@type'] === CDS_TYPE_URL + ) { + const resp = resource as protoLoader.AnyExtension & Cluster__Output; + if (!this.validateCdsResponse(resp)) { + this.nackCds('Cluster validation failed'); + return; + } + } else { + this.nackEds( + `Invalid resource type ${ + protoLoader.isAnyExtension(resource) + ? resource['@type'] + : resource.type_url + }` + ); + return; + } + } + for (const message of cdsResponses) { + this.handleCdsResponse(message); + } + this.lastCdsVersionInfo = message.version_info; + this.lastCdsNonce = message.nonce; + this.ackCds(); + break; default: this.nackUnknown( message.type_url, @@ -270,6 +306,19 @@ export class XdsClient { }); } + private ackCds() { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node!, + type_url: CDS_TYPE_URL, + resource_names: Array.from(this.clusterWatchers.keys()), + response_nonce: this.lastCdsNonce, + version_info: this.lastCdsVersionInfo, + }); + } + /** * Reject an EDS update. This should be called without updating the local * nonce and version info. @@ -290,6 +339,22 @@ export class XdsClient { }); } + private nackCds(message: string) { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node!, + type_url: CDS_TYPE_URL, + resource_names: Array.from(this.clusterWatchers.keys()), + response_nonce: this.lastCdsNonce, + version_info: this.lastCdsVersionInfo, + error_detail: { + message, + }, + }); + } + /** * Validate the ClusterLoadAssignment object by these rules: * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto @@ -313,6 +378,24 @@ export class XdsClient { return true; } + private validateCdsResponse(message: Cluster__Output): boolean { + if (message.type !== 'EDS') { + return false; + } + if (!message.eds_cluster_config.eds_config.ads) { + return false; + } + if (message.lb_policy !== 'ROUND_ROBIN') { + return false; + } + if (message.lrs_server) { + if (!message.lrs_server.self) { + return false; + } + } + return true; + } + private handleEdsResponse(message: ClusterLoadAssignment__Output) { const watchers = this.endpointWatchers.get(message.cluster_name) ?? []; for (const watcher of watchers) { @@ -320,6 +403,13 @@ export class XdsClient { } } + private handleCdsResponse(message: Cluster__Output) { + const watchers = this.clusterWatchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + private updateEdsNames() { if (this.adsCall) { this.adsCall.write({ @@ -332,8 +422,20 @@ export class XdsClient { } } + private updateCdsNames() { + if (this.adsCall) { + this.adsCall.write({ + node: this.node!, + type_url: CDS_TYPE_URL, + resource_names: Array.from(this.clusterWatchers.keys()), + response_nonce: this.lastCdsNonce, + version_info: this.lastCdsVersionInfo, + }); + } + } + private reportStreamError(status: StatusObject) { - for (const watcherList of this.endpointWatchers.values()) { + for (const watcherList of [...this.endpointWatchers.values(), ...this.clusterWatchers.values()]) { for (const watcher of watcherList) { watcher.onTransientError(status); } @@ -381,6 +483,46 @@ export class XdsClient { } } + addClusterWatcher( + clusterName: string, + watcher: Watcher + ) { + trace('Watcher added for cluster ' + clusterName); + let watchersEntry = this.clusterWatchers.get(clusterName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.clusterWatchers.set(clusterName, watchersEntry); + } + watchersEntry.push(watcher); + if (addedServiceName) { + this.updateCdsNames(); + } + } + + removeClusterWatcher( + clusterName: string, + watcher: Watcher + ) { + trace('Watcher removed for endpoint ' + clusterName); + const watchersEntry = this.clusterWatchers.get(clusterName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.endpointWatchers.delete(clusterName); + } + } + if (removedServiceName) { + this.updateCdsNames(); + } + } + shutdown(): void { this.adsCall?.cancel(); this.client?.close(); From c9074b634cea34fe69225afc74124c8c02d89acd Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 23 Jul 2020 14:21:53 -0700 Subject: [PATCH 2/3] Finish implementing the CDS load balancer --- packages/grpc-js/src/load-balancer-cds.ts | 277 +++++++++++++--------- packages/grpc-js/src/load-balancer-eds.ts | 12 +- packages/grpc-js/src/load-balancer.ts | 4 + packages/grpc-js/src/xds-client.ts | 46 ++-- 4 files changed, 205 insertions(+), 134 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-cds.ts b/packages/grpc-js/src/load-balancer-cds.ts index 33fe42c7..7e7bfe17 100644 --- a/packages/grpc-js/src/load-balancer-cds.ts +++ b/packages/grpc-js/src/load-balancer-cds.ts @@ -1,115 +1,162 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - LoadBalancer, - ChannelControlHelper, - registerLoadBalancerType, -} from './load-balancer'; -import { SubchannelAddress } from './subchannel'; -import { LoadBalancingConfig, isCdsLoadBalancingConfig, EdsLbConfig, CdsLoadBalancingConfig } from './load-balancing-config'; -import { XdsClient, Watcher } from './xds-client'; -import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; -import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; -import { ConnectivityState } from './channel'; -import { UnavailablePicker } from './picker'; -import { Status } from './constants'; -import { Metadata } from '.'; - -const TYPE_NAME = 'cds'; - -export class CdsLoadBalancer implements LoadBalancer { - private childBalancer: ChildLoadBalancerHandler; - private xdsClient: XdsClient | null = null; - private watcher: Watcher; - - private isWatcherActive = false; - - private latestCdsUpdate: Cluster__Output | null = null; - - private latestConfig: CdsLoadBalancingConfig | null = null; - private latestAttributes: { [key: string]: unknown } = {}; - - constructor(private readonly channelControlHelper: ChannelControlHelper) { - this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); - this.watcher = { - onValidUpdate: update => { - this.latestCdsUpdate = update; - const edsConfig: EdsLbConfig = { - cluster: update.name, - edsServiceName: update.eds_cluster_config.service_name === '' ? undefined : update.eds_cluster_config.service_name, - localityPickingPolicy: [], - endpointPickingPolicy: [] - // TODO(murgatroid99): populate lrsLoadReportingServerName - } - this.childBalancer.updateAddressList([], {name: 'eds', eds: edsConfig}, this.latestAttributes); - }, - onResourceDoesNotExist: () => { - this.xdsClient?.removeClusterWatcher(this.latestConfig!.cds.cluster, this.watcher); - this.isWatcherActive = false; - }, - onTransientError: status => { - if (this.latestCdsUpdate === null) { - channelControlHelper.updateState( - ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker({ - code: Status.UNAVAILABLE, - details: `xDS request failed with error ${status.details}`, - metadata: new Metadata(), - }) - ); - } - } - }; - } - - updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { - if (!isCdsLoadBalancingConfig(lbConfig)) { - return; - } - if (!(attributes.xdsClient instanceof XdsClient)) { - return; - } - this.xdsClient = attributes.xdsClient; - this.latestConfig = lbConfig; - this.latestAttributes = attributes; - - if (!this.isWatcherActive) { - this.xdsClient.addClusterWatcher(lbConfig.cds.cluster, this.watcher); - this.isWatcherActive = true; - } - } - exitIdle(): void { - this.childBalancer.exitIdle(); - } - resetBackoff(): void { - this.childBalancer.resetBackoff(); - } - destroy(): void { - this.childBalancer.destroy(); - if (this.isWatcherActive) { - this.xdsClient?.removeClusterWatcher(this.latestConfig!.cds.cluster, this.watcher); - } - } - getTypeName(): string { - return TYPE_NAME; - } -} - -export function setup() { - registerLoadBalancerType(TYPE_NAME, CdsLoadBalancer); -} \ No newline at end of file +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isCdsLoadBalancingConfig, + EdsLbConfig, + CdsLoadBalancingConfig, +} from './load-balancing-config'; +import { XdsClient, Watcher } from './xds-client'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; +import { ConnectivityState } from './channel'; +import { UnavailablePicker } from './picker'; +import { Status } from './constants'; +import { Metadata } from '.'; + +const TYPE_NAME = 'cds'; + +export class CdsLoadBalancer implements LoadBalancer { + private childBalancer: ChildLoadBalancerHandler; + private xdsClient: XdsClient | null = null; + private watcher: Watcher; + + private isWatcherActive = false; + + private latestCdsUpdate: Cluster__Output | null = null; + + private latestConfig: CdsLoadBalancingConfig | null = null; + private latestAttributes: { [key: string]: unknown } = {}; + + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); + this.watcher = { + onValidUpdate: (update) => { + this.latestCdsUpdate = update; + const edsConfig: EdsLbConfig = { + cluster: update.name, + edsServiceName: + update.eds_cluster_config!.service_name === '' + ? undefined + : update.eds_cluster_config!.service_name, + localityPickingPolicy: [], + endpointPickingPolicy: [], + }; + if (update.lrs_server?.self) { + /* the lrs_server.self field indicates that the same server should be + * used for load reporting as for other xDS operations. Setting + * lrsLoadReportingServerName to the empty string sets that behavior. + * Otherwise, if the field is omitted, load reporting is disabled. */ + edsConfig.lrsLoadReportingServerName = ''; + } + this.childBalancer.updateAddressList( + [], + { name: 'eds', eds: edsConfig }, + this.latestAttributes + ); + }, + onResourceDoesNotExist: () => { + this.xdsClient?.removeClusterWatcher( + this.latestConfig!.cds.cluster, + this.watcher + ); + this.isWatcherActive = false; + }, + onTransientError: (status) => { + if (this.latestCdsUpdate === null) { + channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: `xDS request failed with error ${status.details}`, + metadata: new Metadata(), + }) + ); + } + }, + }; + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isCdsLoadBalancingConfig(lbConfig)) { + return; + } + if (!(attributes.xdsClient instanceof XdsClient)) { + return; + } + this.xdsClient = attributes.xdsClient; + this.latestAttributes = attributes; + + /* If the cluster is changing, disable the old watcher before adding the new + * one */ + if ( + this.isWatcherActive && + this.latestConfig?.cds.cluster !== lbConfig.cds.cluster + ) { + this.xdsClient.removeClusterWatcher( + this.latestConfig!.cds.cluster, + this.watcher + ); + /* Setting isWatcherActive to false here lets us have one code path for + * calling addClusterWatcher */ + this.isWatcherActive = false; + /* If we have a new name, the latestCdsUpdate does not correspond to + * the new config, so it is no longer valid */ + this.latestCdsUpdate = null; + } + + this.latestConfig = lbConfig; + + if (!this.isWatcherActive) { + this.xdsClient.addClusterWatcher(lbConfig.cds.cluster, this.watcher); + this.isWatcherActive = true; + } + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + if (this.isWatcherActive) { + this.xdsClient?.removeClusterWatcher( + this.latestConfig!.cds.cluster, + this.watcher + ); + } + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, CdsLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 5dbbfd70..24b464af 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -91,7 +91,10 @@ export class EdsLoadBalancer implements LoadBalancer { this.updateChild(); }, onResourceDoesNotExist: () => { - this.xdsClient?.removeEndpointWatcher(this.edsServiceName!, this.watcher); + this.xdsClient?.removeEndpointWatcher( + this.edsServiceName!, + this.watcher + ); this.isWatcherActive = false; }, onTransientError: (status) => { @@ -282,17 +285,18 @@ export class EdsLoadBalancer implements LoadBalancer { this.lastestConfig = lbConfig; this.latestAttributes = attributes; this.xdsClient = attributes.xdsClient; - const newEdsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; + const newEdsServiceName = + lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; /* If the name is changing, disable the old watcher before adding the new * one */ if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) { this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher); /* Setting isWatcherActive to false here lets us have one code path for - * calling addEndpointWatcher */ + * calling addEndpointWatcher */ this.isWatcherActive = false; /* If we have a new name, the latestEdsUpdate does not correspond to - * the new config, so it is no longer valid */ + * the new config, so it is no longer valid */ this.latestEdsUpdate = null; } diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 9a1c1fcd..227bbe9c 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -24,6 +24,8 @@ import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; +import * as load_balancer_eds from './load-balancer-eds'; +import * as load_balancer_cds from './load-balancer-cds'; /** * A collection of functions associated with a channel that a load balancer @@ -141,4 +143,6 @@ export function registerAll() { load_balancer_round_robin.setup(); load_balancer_priority.setup(); load_balancer_weighted_target.setup(); + load_balancer_eds.setup(); + load_balancer_cds.setup(); } diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 54d059e2..98bafda3 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -44,7 +44,7 @@ function trace(text: string): void { const clientVersion = require('../../package.json').version; const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; -const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster' +const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster'; let loadedProtos: Promise | null = null; @@ -108,10 +108,14 @@ export class XdsClient { private lastEdsVersionInfo = ''; private lastEdsNonce = ''; private latestEdsResponses: ClusterLoadAssignment__Output[] = []; - - private clusterWatchers: Map[]> = new Map[]>(); + + private clusterWatchers: Map[]> = new Map< + string, + Watcher[] + >(); private lastCdsVersionInfo = ''; private lastCdsNonce = ''; + private latestCdsResponses: Cluster__Output[] = []; constructor( private targetName: string, @@ -217,14 +221,15 @@ export class XdsClient { this.ackEds(); break; } - case CDS_TYPE_URL: + case CDS_TYPE_URL: { const cdsResponses: Cluster__Output[] = []; for (const resource of message.resources) { if ( protoLoader.isAnyExtension(resource) && resource['@type'] === CDS_TYPE_URL ) { - const resp = resource as protoLoader.AnyExtension & Cluster__Output; + const resp = resource as protoLoader.AnyExtension & + Cluster__Output; if (!this.validateCdsResponse(resp)) { this.nackCds('Cluster validation failed'); return; @@ -245,8 +250,10 @@ export class XdsClient { } this.lastCdsVersionInfo = message.version_info; this.lastCdsNonce = message.nonce; + this.latestCdsResponses = cdsResponses; this.ackCds(); break; + } default: this.nackUnknown( message.type_url, @@ -384,7 +391,7 @@ export class XdsClient { if (message.type !== 'EDS') { return false; } - if (!message.eds_cluster_config.eds_config.ads) { + if (!message.eds_cluster_config?.eds_config?.ads) { return false; } if (message.lb_policy !== 'ROUND_ROBIN') { @@ -437,7 +444,10 @@ export class XdsClient { } private reportStreamError(status: StatusObject) { - for (const watcherList of [...this.endpointWatchers.values(), ...this.clusterWatchers.values()]) { + for (const watcherList of [ + ...this.endpointWatchers.values(), + ...this.clusterWatchers.values(), + ]) { for (const watcher of watcherList) { watcher.onTransientError(status); } @@ -497,10 +507,7 @@ export class XdsClient { } } - addClusterWatcher( - clusterName: string, - watcher: Watcher - ) { + addClusterWatcher(clusterName: string, watcher: Watcher) { trace('Watcher added for cluster ' + clusterName); let watchersEntry = this.clusterWatchers.get(clusterName); let addedServiceName = false; @@ -513,12 +520,21 @@ export class XdsClient { if (addedServiceName) { this.updateCdsNames(); } + + /* If we have already received an update for the requested clusterName, + * immediately pass that update along to the watcher */ + for (const message of this.latestCdsResponses) { + if (message.name === clusterName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + watcher.onValidUpdate(message); + }); + } + } } - removeClusterWatcher( - clusterName: string, - watcher: Watcher - ) { + removeClusterWatcher(clusterName: string, watcher: Watcher) { trace('Watcher removed for endpoint ' + clusterName); const watchersEntry = this.clusterWatchers.get(clusterName); let removedServiceName = false; From fbf2a487f16024965cc67ccd34f234aad8fee662 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 24 Jul 2020 09:56:33 -0700 Subject: [PATCH 3/3] Fix Metadata import path --- packages/grpc-js/src/load-balancer-cds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/src/load-balancer-cds.ts b/packages/grpc-js/src/load-balancer-cds.ts index 7e7bfe17..82346978 100644 --- a/packages/grpc-js/src/load-balancer-cds.ts +++ b/packages/grpc-js/src/load-balancer-cds.ts @@ -33,7 +33,7 @@ import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; import { ConnectivityState } from './channel'; import { UnavailablePicker } from './picker'; import { Status } from './constants'; -import { Metadata } from '.'; +import { Metadata } from './metadata'; const TYPE_NAME = 'cds';