From 5e28fb3078ee9feeabdf061f83526d626ec772e6 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 17 Jul 2020 10:35:04 -0700 Subject: [PATCH 1/4] grpc-js: Implement EDS load balancer --- packages/grpc-js/src/load-balancer-eds.ts | 254 ++++++++++++++++++ packages/grpc-js/src/load-balancing-config.ts | 32 ++- 2 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 packages/grpc-js/src/load-balancer-eds.ts diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts new file mode 100644 index 00000000..312d059c --- /dev/null +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -0,0 +1,254 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { LoadBalancer, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig } from "./load-balancer"; +import { SubchannelAddress } from "./subchannel"; +import { LoadBalancingConfig, isEdsLoadBalancingConfig, EdsLoadBalancingConfig, PriorityLbConfig, PriorityChild, WeightedTarget, PriorityLoadBalancingConfig } from "./load-balancing-config"; +import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; +import { XdsClient, Watcher } from "./xds-client"; +import { ClusterLoadAssignment__Output } from "./generated/envoy/api/v2/ClusterLoadAssignment"; +import { ConnectivityState } from "./channel"; +import { UnavailablePicker } from "./picker"; +import { Locality__Output } from "./generated/envoy/api/v2/core/Locality"; +import { LocalitySubchannelAddress } from "./load-balancer-priority"; +import { Status } from "./constants"; +import { Metadata } from "./metadata"; + +const TYPE_NAME = 'eds'; + +function localityToName(locality: Locality__Output) { + return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; +} + +/** + * This class load balances over a cluster by making an EDS request and then + * transforming the result into a configuration for another load balancing + * policy. + */ +export class EdsLoadBalancer implements LoadBalancer { + /** + * The child load balancer that will handle balancing the results of the EDS + * requests. + */ + private childBalancer: ChildLoadBalancerHandler; + private xdsClient: XdsClient | null = null; + private edsServiceName: string | null = null; + private watcher: Watcher; + /** + * Indicates whether the watcher has already been passed to this.xdsClient + * and is getting updates. + */ + private isWatcherActive = false; + + private lastestConfig: EdsLoadBalancingConfig | null = null; + private latestAttributes: {[key: string]: unknown} = {}; + private latestEdsUpdate: ClusterLoadAssignment__Output | null = null; + + /** + * The priority of each locality the last time we got an update. + */ + private localityPriorities: Map = new Map(); + /** + * The name we assigned to each priority number the last time we got an + * update. + */ + private priorityNames: string[] = []; + + private nextPriorityChildNumber = 0; + + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); + this.watcher = { + onValidUpdate: update => { + this.latestEdsUpdate = update; + this.updateChild(); + }, + onResourceDoesNotExist: () => { + /* TODO(murgatroid99): Figure out what needs to be done here after + * implementing CDS */ + }, + onTransientError: (status) => { + if (this.latestEdsUpdate === null) { + channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: `xDS request failed with error ${status.details}`, + metadata: new Metadata() + })); + } + } + } + } + + /** + * Should be called when this balancer gets a new config and when the + * XdsClient returns a new ClusterLoadAssignment. + */ + private updateChild() { + if (!(this.lastestConfig && this.latestEdsUpdate)) { + return; + } + /** + * Maps each priority number to the list of localities with that priority, + * and the list of addresses associated with each locality. + */ + const priorityList: {locality: Locality__Output, weight: number, addresses: SubchannelAddress[]}[][] = []; + const newLocalityPriorities: Map = new Map(); + /* We are given a list of localities, each of which has a priority. This + * loop consolidates localities into buckets by priority, while also + * simplifying the data structure to make the later steps simpler */ + for (const endpoint of this.latestEdsUpdate.endpoints) { + let localityArray = priorityList[endpoint.priority]; + if (localityArray === undefined) { + localityArray = []; + priorityList[endpoint.priority] = localityArray; + } + const addresses: SubchannelAddress[] = endpoint.lb_endpoints.map(lbEndpoint => { + /* The validator in the XdsClient class ensures that each endpoint has + * a socket_address with an IP address and a port_value. */ + const socketAddress = lbEndpoint.endpoint!.address.socket_address!; + return {host: socketAddress.address!, port: socketAddress.port_value!}; + }); + localityArray.push({ + locality: endpoint.locality, + addresses: addresses, + weight: endpoint.load_balancing_weight.value + }); + newLocalityPriorities.set(localityToName(endpoint.locality), endpoint.priority); + } + + const newPriorityNames: string[] = []; + const addressList: LocalitySubchannelAddress[] = []; + const priorityChildren: Map = new Map(); + /* The algorithm here is as follows: for each priority we are given, from + * high to low: + * - If the previous mapping had any of the same localities at the same or + * a lower priority, use the matching name from the highest such + * priority, unless the new mapping has already used that name. + * - Otherwise, construct a new name using this.nextPriorityChildNumber. + */ + for (const [priority, localityArray] of priorityList.entries()) { + if (localityArray === undefined) { + continue; + } + /** + * Highest (smallest number) priority value that any of the localities in + * this locality array had a in the previous mapping. + */ + let highestOldPriority = Infinity; + for (const localityObj of localityArray) { + const oldPriority = this.localityPriorities.get(localityToName(localityObj.locality)); + if (oldPriority !== undefined && oldPriority >= priority && oldPriority < highestOldPriority) { + highestOldPriority = oldPriority; + } + } + let newPriorityName: string; + if (highestOldPriority === Infinity) { + /* No existing priority at or below the same number as the priority we + * are looking at had any of the localities in this priority. So, we + * use a new name. */ + newPriorityName = `child${this.nextPriorityChildNumber++}`; + } else { + const newName = this.priorityNames[highestOldPriority]; + if (newPriorityNames.indexOf(newName) < 0) { + newPriorityName = newName; + } else { + newPriorityName = `child${this.nextPriorityChildNumber++}`; + } + } + newPriorityNames[priority] = newPriorityName; + + const childTargets: Map = new Map(); + for (const localityObj of localityArray) { + childTargets.set(localityToName(localityObj.locality), { + weight: localityObj.weight, + /* TODO(murgatroid99): Insert an lrs config around the round_robin + * config after implementing lrs */ + /* Use the endpoint picking policy from the config, default to + * round_robin. */ + child_policy: [...this.lastestConfig.eds.endpointPickingPolicy, {name: 'round_robin', round_robin: {}}] + }); + for (const address of localityObj.addresses) { + addressList.push({ + localityPath: [newPriorityName, localityToName(localityObj.locality)], + ...address + }); + } + } + + priorityChildren.set(newPriorityName, { + config: [{ + name: 'weighted_target', + weighted_target: { + targets: childTargets + } + }] + }); + } + const childConfig: PriorityLoadBalancingConfig = { + name: 'priority', + priority: { + children: priorityChildren, + /* Contract the priority names array if it is sparse. This config only + * cares about the order of priorities, not their specific numbers */ + priorities: newPriorityNames.filter(value => value !== undefined) + } + }; + this.childBalancer.updateAddressList(addressList, childConfig, this.latestAttributes); + + this.localityPriorities = newLocalityPriorities; + this.priorityNames = newPriorityNames; + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!isEdsLoadBalancingConfig(lbConfig)) { + return; + } + if (!(attributes.xdsClient instanceof XdsClient)) { + return; + } + this.lastestConfig = lbConfig; + this.latestAttributes = attributes; + this.xdsClient = attributes.xdsClient; + this.edsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; + + if (!this.isWatcherActive) { + this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); + this.isWatcherActive = true; + } + + this.updateChild(); + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + if (this.edsServiceName) { + this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher); + } + this.childBalancer.destroy(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, EdsLoadBalancer); +} \ No newline at end of file diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index 736e3d42..164e1ab4 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -57,6 +57,24 @@ export interface WeightedTargetLbConfig { targets: Map; } +export interface EdsLbConfig { + cluster: string; + edsServiceName?: string; + lrsLoadReportingServerName?: string; + /** + * This policy's config is expected to be in the format used by the + * weighted_target policy. Defaults to weighted_target if not specified. + * + * This is currently not used because there is currently no other config + * that has the same format as weighted_target. + */ + localityPickingPolicy: LoadBalancingConfig[]; + /** + * Defaults to round_robin if not specified. + */ + endpointPickingPolicy: LoadBalancingConfig[]; +} + export interface PickFirstLoadBalancingConfig { name: 'pick_first'; pick_first: PickFirstConfig; @@ -87,13 +105,19 @@ export interface WeightedTargetLoadBalancingConfig { weighted_target: WeightedTargetLbConfig; } +export interface EdsLoadBalancingConfig { + name: 'eds'; + eds: EdsLbConfig; +} + export type LoadBalancingConfig = | PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig | PriorityLoadBalancingConfig - | WeightedTargetLoadBalancingConfig; + | WeightedTargetLoadBalancingConfig + | EdsLoadBalancingConfig; export function isRoundRobinLoadBalancingConfig( lbconfig: LoadBalancingConfig @@ -125,6 +149,12 @@ export function isWeightedTargetLoadBalancingConfig( return lbconfig.name === 'weighted_target'; } +export function isEdsLoadBalancingConfig( + lbconfig: LoadBalancingConfig +): lbconfig is EdsLoadBalancingConfig { + return lbconfig.name === 'eds'; +} + /* In these functions we assume the input came from a JSON object. Therefore we * expect that the prototype is uninteresting and that `in` can be used * effectively */ From f061e4e762750edcf1ea5f3c9fc04b87c900b8f3 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 17 Jul 2020 10:37:49 -0700 Subject: [PATCH 2/4] gts fix --- packages/grpc-js/src/load-balancer-eds.ts | 567 ++++++++++++---------- 1 file changed, 313 insertions(+), 254 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 312d059c..57582849 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -1,254 +1,313 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { LoadBalancer, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig } from "./load-balancer"; -import { SubchannelAddress } from "./subchannel"; -import { LoadBalancingConfig, isEdsLoadBalancingConfig, EdsLoadBalancingConfig, PriorityLbConfig, PriorityChild, WeightedTarget, PriorityLoadBalancingConfig } from "./load-balancing-config"; -import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; -import { XdsClient, Watcher } from "./xds-client"; -import { ClusterLoadAssignment__Output } from "./generated/envoy/api/v2/ClusterLoadAssignment"; -import { ConnectivityState } from "./channel"; -import { UnavailablePicker } from "./picker"; -import { Locality__Output } from "./generated/envoy/api/v2/core/Locality"; -import { LocalitySubchannelAddress } from "./load-balancer-priority"; -import { Status } from "./constants"; -import { Metadata } from "./metadata"; - -const TYPE_NAME = 'eds'; - -function localityToName(locality: Locality__Output) { - return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; -} - -/** - * This class load balances over a cluster by making an EDS request and then - * transforming the result into a configuration for another load balancing - * policy. - */ -export class EdsLoadBalancer implements LoadBalancer { - /** - * The child load balancer that will handle balancing the results of the EDS - * requests. - */ - private childBalancer: ChildLoadBalancerHandler; - private xdsClient: XdsClient | null = null; - private edsServiceName: string | null = null; - private watcher: Watcher; - /** - * Indicates whether the watcher has already been passed to this.xdsClient - * and is getting updates. - */ - private isWatcherActive = false; - - private lastestConfig: EdsLoadBalancingConfig | null = null; - private latestAttributes: {[key: string]: unknown} = {}; - private latestEdsUpdate: ClusterLoadAssignment__Output | null = null; - - /** - * The priority of each locality the last time we got an update. - */ - private localityPriorities: Map = new Map(); - /** - * The name we assigned to each priority number the last time we got an - * update. - */ - private priorityNames: string[] = []; - - private nextPriorityChildNumber = 0; - - constructor(private readonly channelControlHelper: ChannelControlHelper) { - this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); - this.watcher = { - onValidUpdate: update => { - this.latestEdsUpdate = update; - this.updateChild(); - }, - onResourceDoesNotExist: () => { - /* TODO(murgatroid99): Figure out what needs to be done here after - * implementing CDS */ - }, - onTransientError: (status) => { - if (this.latestEdsUpdate === null) { - channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ - code: Status.UNAVAILABLE, - details: `xDS request failed with error ${status.details}`, - metadata: new Metadata() - })); - } - } - } - } - - /** - * Should be called when this balancer gets a new config and when the - * XdsClient returns a new ClusterLoadAssignment. - */ - private updateChild() { - if (!(this.lastestConfig && this.latestEdsUpdate)) { - return; - } - /** - * Maps each priority number to the list of localities with that priority, - * and the list of addresses associated with each locality. - */ - const priorityList: {locality: Locality__Output, weight: number, addresses: SubchannelAddress[]}[][] = []; - const newLocalityPriorities: Map = new Map(); - /* We are given a list of localities, each of which has a priority. This - * loop consolidates localities into buckets by priority, while also - * simplifying the data structure to make the later steps simpler */ - for (const endpoint of this.latestEdsUpdate.endpoints) { - let localityArray = priorityList[endpoint.priority]; - if (localityArray === undefined) { - localityArray = []; - priorityList[endpoint.priority] = localityArray; - } - const addresses: SubchannelAddress[] = endpoint.lb_endpoints.map(lbEndpoint => { - /* The validator in the XdsClient class ensures that each endpoint has - * a socket_address with an IP address and a port_value. */ - const socketAddress = lbEndpoint.endpoint!.address.socket_address!; - return {host: socketAddress.address!, port: socketAddress.port_value!}; - }); - localityArray.push({ - locality: endpoint.locality, - addresses: addresses, - weight: endpoint.load_balancing_weight.value - }); - newLocalityPriorities.set(localityToName(endpoint.locality), endpoint.priority); - } - - const newPriorityNames: string[] = []; - const addressList: LocalitySubchannelAddress[] = []; - const priorityChildren: Map = new Map(); - /* The algorithm here is as follows: for each priority we are given, from - * high to low: - * - If the previous mapping had any of the same localities at the same or - * a lower priority, use the matching name from the highest such - * priority, unless the new mapping has already used that name. - * - Otherwise, construct a new name using this.nextPriorityChildNumber. - */ - for (const [priority, localityArray] of priorityList.entries()) { - if (localityArray === undefined) { - continue; - } - /** - * Highest (smallest number) priority value that any of the localities in - * this locality array had a in the previous mapping. - */ - let highestOldPriority = Infinity; - for (const localityObj of localityArray) { - const oldPriority = this.localityPriorities.get(localityToName(localityObj.locality)); - if (oldPriority !== undefined && oldPriority >= priority && oldPriority < highestOldPriority) { - highestOldPriority = oldPriority; - } - } - let newPriorityName: string; - if (highestOldPriority === Infinity) { - /* No existing priority at or below the same number as the priority we - * are looking at had any of the localities in this priority. So, we - * use a new name. */ - newPriorityName = `child${this.nextPriorityChildNumber++}`; - } else { - const newName = this.priorityNames[highestOldPriority]; - if (newPriorityNames.indexOf(newName) < 0) { - newPriorityName = newName; - } else { - newPriorityName = `child${this.nextPriorityChildNumber++}`; - } - } - newPriorityNames[priority] = newPriorityName; - - const childTargets: Map = new Map(); - for (const localityObj of localityArray) { - childTargets.set(localityToName(localityObj.locality), { - weight: localityObj.weight, - /* TODO(murgatroid99): Insert an lrs config around the round_robin - * config after implementing lrs */ - /* Use the endpoint picking policy from the config, default to - * round_robin. */ - child_policy: [...this.lastestConfig.eds.endpointPickingPolicy, {name: 'round_robin', round_robin: {}}] - }); - for (const address of localityObj.addresses) { - addressList.push({ - localityPath: [newPriorityName, localityToName(localityObj.locality)], - ...address - }); - } - } - - priorityChildren.set(newPriorityName, { - config: [{ - name: 'weighted_target', - weighted_target: { - targets: childTargets - } - }] - }); - } - const childConfig: PriorityLoadBalancingConfig = { - name: 'priority', - priority: { - children: priorityChildren, - /* Contract the priority names array if it is sparse. This config only - * cares about the order of priorities, not their specific numbers */ - priorities: newPriorityNames.filter(value => value !== undefined) - } - }; - this.childBalancer.updateAddressList(addressList, childConfig, this.latestAttributes); - - this.localityPriorities = newLocalityPriorities; - this.priorityNames = newPriorityNames; - } - - updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { - if (!isEdsLoadBalancingConfig(lbConfig)) { - return; - } - if (!(attributes.xdsClient instanceof XdsClient)) { - return; - } - this.lastestConfig = lbConfig; - this.latestAttributes = attributes; - this.xdsClient = attributes.xdsClient; - this.edsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; - - if (!this.isWatcherActive) { - this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); - this.isWatcherActive = true; - } - - this.updateChild(); - } - exitIdle(): void { - this.childBalancer.exitIdle(); - } - resetBackoff(): void { - this.childBalancer.resetBackoff(); - } - destroy(): void { - if (this.edsServiceName) { - this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher); - } - this.childBalancer.destroy(); - } - getTypeName(): string { - return TYPE_NAME; - } -} - -export function setup() { - registerLoadBalancerType(TYPE_NAME, EdsLoadBalancer); -} \ No newline at end of file +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, + getFirstUsableConfig, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isEdsLoadBalancingConfig, + EdsLoadBalancingConfig, + PriorityLbConfig, + PriorityChild, + WeightedTarget, + PriorityLoadBalancingConfig, +} from './load-balancing-config'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { XdsClient, Watcher } from './xds-client'; +import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment'; +import { ConnectivityState } from './channel'; +import { UnavailablePicker } from './picker'; +import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; +import { LocalitySubchannelAddress } from './load-balancer-priority'; +import { Status } from './constants'; +import { Metadata } from './metadata'; + +const TYPE_NAME = 'eds'; + +function localityToName(locality: Locality__Output) { + return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; +} + +/** + * This class load balances over a cluster by making an EDS request and then + * transforming the result into a configuration for another load balancing + * policy. + */ +export class EdsLoadBalancer implements LoadBalancer { + /** + * The child load balancer that will handle balancing the results of the EDS + * requests. + */ + private childBalancer: ChildLoadBalancerHandler; + private xdsClient: XdsClient | null = null; + private edsServiceName: string | null = null; + private watcher: Watcher; + /** + * Indicates whether the watcher has already been passed to this.xdsClient + * and is getting updates. + */ + private isWatcherActive = false; + + private lastestConfig: EdsLoadBalancingConfig | null = null; + private latestAttributes: { [key: string]: unknown } = {}; + private latestEdsUpdate: ClusterLoadAssignment__Output | null = null; + + /** + * The priority of each locality the last time we got an update. + */ + private localityPriorities: Map = new Map(); + /** + * The name we assigned to each priority number the last time we got an + * update. + */ + private priorityNames: string[] = []; + + private nextPriorityChildNumber = 0; + + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); + this.watcher = { + onValidUpdate: (update) => { + this.latestEdsUpdate = update; + this.updateChild(); + }, + onResourceDoesNotExist: () => { + /* TODO(murgatroid99): Figure out what needs to be done here after + * implementing CDS */ + }, + onTransientError: (status) => { + if (this.latestEdsUpdate === null) { + channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: `xDS request failed with error ${status.details}`, + metadata: new Metadata(), + }) + ); + } + }, + }; + } + + /** + * Should be called when this balancer gets a new config and when the + * XdsClient returns a new ClusterLoadAssignment. + */ + private updateChild() { + if (!(this.lastestConfig && this.latestEdsUpdate)) { + return; + } + /** + * Maps each priority number to the list of localities with that priority, + * and the list of addresses associated with each locality. + */ + const priorityList: { + locality: Locality__Output; + weight: number; + addresses: SubchannelAddress[]; + }[][] = []; + const newLocalityPriorities: Map = new Map< + string, + number + >(); + /* We are given a list of localities, each of which has a priority. This + * loop consolidates localities into buckets by priority, while also + * simplifying the data structure to make the later steps simpler */ + for (const endpoint of this.latestEdsUpdate.endpoints) { + let localityArray = priorityList[endpoint.priority]; + if (localityArray === undefined) { + localityArray = []; + priorityList[endpoint.priority] = localityArray; + } + const addresses: SubchannelAddress[] = endpoint.lb_endpoints.map( + (lbEndpoint) => { + /* The validator in the XdsClient class ensures that each endpoint has + * a socket_address with an IP address and a port_value. */ + const socketAddress = lbEndpoint.endpoint!.address.socket_address!; + return { + host: socketAddress.address!, + port: socketAddress.port_value!, + }; + } + ); + localityArray.push({ + locality: endpoint.locality, + addresses: addresses, + weight: endpoint.load_balancing_weight.value, + }); + newLocalityPriorities.set( + localityToName(endpoint.locality), + endpoint.priority + ); + } + + const newPriorityNames: string[] = []; + const addressList: LocalitySubchannelAddress[] = []; + const priorityChildren: Map = new Map< + string, + PriorityChild + >(); + /* The algorithm here is as follows: for each priority we are given, from + * high to low: + * - If the previous mapping had any of the same localities at the same or + * a lower priority, use the matching name from the highest such + * priority, unless the new mapping has already used that name. + * - Otherwise, construct a new name using this.nextPriorityChildNumber. + */ + for (const [priority, localityArray] of priorityList.entries()) { + if (localityArray === undefined) { + continue; + } + /** + * Highest (smallest number) priority value that any of the localities in + * this locality array had a in the previous mapping. + */ + let highestOldPriority = Infinity; + for (const localityObj of localityArray) { + const oldPriority = this.localityPriorities.get( + localityToName(localityObj.locality) + ); + if ( + oldPriority !== undefined && + oldPriority >= priority && + oldPriority < highestOldPriority + ) { + highestOldPriority = oldPriority; + } + } + let newPriorityName: string; + if (highestOldPriority === Infinity) { + /* No existing priority at or below the same number as the priority we + * are looking at had any of the localities in this priority. So, we + * use a new name. */ + newPriorityName = `child${this.nextPriorityChildNumber++}`; + } else { + const newName = this.priorityNames[highestOldPriority]; + if (newPriorityNames.indexOf(newName) < 0) { + newPriorityName = newName; + } else { + newPriorityName = `child${this.nextPriorityChildNumber++}`; + } + } + newPriorityNames[priority] = newPriorityName; + + const childTargets: Map = new Map< + string, + WeightedTarget + >(); + for (const localityObj of localityArray) { + childTargets.set(localityToName(localityObj.locality), { + weight: localityObj.weight, + /* TODO(murgatroid99): Insert an lrs config around the round_robin + * config after implementing lrs */ + /* Use the endpoint picking policy from the config, default to + * round_robin. */ + child_policy: [ + ...this.lastestConfig.eds.endpointPickingPolicy, + { name: 'round_robin', round_robin: {} }, + ], + }); + for (const address of localityObj.addresses) { + addressList.push({ + localityPath: [ + newPriorityName, + localityToName(localityObj.locality), + ], + ...address, + }); + } + } + + priorityChildren.set(newPriorityName, { + config: [ + { + name: 'weighted_target', + weighted_target: { + targets: childTargets, + }, + }, + ], + }); + } + const childConfig: PriorityLoadBalancingConfig = { + name: 'priority', + priority: { + children: priorityChildren, + /* Contract the priority names array if it is sparse. This config only + * cares about the order of priorities, not their specific numbers */ + priorities: newPriorityNames.filter((value) => value !== undefined), + }, + }; + this.childBalancer.updateAddressList( + addressList, + childConfig, + this.latestAttributes + ); + + this.localityPriorities = newLocalityPriorities; + this.priorityNames = newPriorityNames; + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isEdsLoadBalancingConfig(lbConfig)) { + return; + } + if (!(attributes.xdsClient instanceof XdsClient)) { + return; + } + this.lastestConfig = lbConfig; + this.latestAttributes = attributes; + this.xdsClient = attributes.xdsClient; + this.edsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; + + if (!this.isWatcherActive) { + this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); + this.isWatcherActive = true; + } + + this.updateChild(); + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + if (this.edsServiceName) { + this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher); + } + this.childBalancer.destroy(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, EdsLoadBalancer); +} From 044da58c76b94a392cff2d2c8cba3d960a41ff71 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 22 Jul 2020 16:38:00 -0700 Subject: [PATCH 3/4] Update with changes from xDS Client PR --- packages/grpc-js/src/load-balancer-eds.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 57582849..73a27bc4 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -143,7 +143,7 @@ export class EdsLoadBalancer implements LoadBalancer { (lbEndpoint) => { /* The validator in the XdsClient class ensures that each endpoint has * a socket_address with an IP address and a port_value. */ - const socketAddress = lbEndpoint.endpoint!.address.socket_address!; + const socketAddress = lbEndpoint.endpoint!.address!.socket_address!; return { host: socketAddress.address!, port: socketAddress.port_value!, @@ -151,12 +151,12 @@ export class EdsLoadBalancer implements LoadBalancer { } ); localityArray.push({ - locality: endpoint.locality, + locality: endpoint.locality!, addresses: addresses, - weight: endpoint.load_balancing_weight.value, + weight: endpoint.load_balancing_weight?.value ?? 0, }); newLocalityPriorities.set( - localityToName(endpoint.locality), + localityToName(endpoint.locality!), endpoint.priority ); } From ef225cba30efd54027b527ecd6627d0b065d45d8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 23 Jul 2020 10:08:50 -0700 Subject: [PATCH 4/4] Handle changing EDS service name, add comments --- packages/grpc-js/src/load-balancer-eds.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 73a27bc4..1bbce0a2 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -282,13 +282,30 @@ export class EdsLoadBalancer implements LoadBalancer { this.lastestConfig = lbConfig; this.latestAttributes = attributes; this.xdsClient = attributes.xdsClient; - this.edsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; + const newEdsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster; + + /* If the name is changing, disable the old watcher before adding the new + * one */ + if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) { + this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher); + /* Setting isWatcherActive to false here lets us have one code path for + * calling addEndpointWatcher */ + this.isWatcherActive = false; + /* If we have a new name, the latestEdsUpdate does not correspond to + * the new config, so it is no longer valid */ + this.latestEdsUpdate = null; + } + + this.edsServiceName = newEdsServiceName; if (!this.isWatcherActive) { this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); this.isWatcherActive = true; } + /* If updateAddressList is called after receiving an update and the update + * is still valid, we want to update the child config with the information + * in the new EdsLoadBalancingConfig. */ this.updateChild(); } exitIdle(): void {