From 4361242eb90959fcf23830ea2c7f985018ba7bf0 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 20 May 2020 14:00:05 -0700 Subject: [PATCH 1/3] Add priority load balancer --- .../grpc-js/src/load-balancer-priority.ts | 457 ++++++++++++++++++ packages/grpc-js/src/load-balancer.ts | 2 + 2 files changed, 459 insertions(+) create mode 100644 packages/grpc-js/src/load-balancer-priority.ts diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts new file mode 100644 index 00000000..94f7f560 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -0,0 +1,457 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + getFirstUsableConfig, + registerLoadBalancerType, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isPriorityLoadBalancingConfig, +} from './load-balancing-config'; +import { ConnectivityState } from './channel'; +import { Picker, QueuePicker, UnavailablePicker } from './picker'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { ChannelOptions } from './channel-options'; +import { Status } from './constants'; +import { Metadata } from './metadata'; + +const TYPE_NAME = 'priority'; + +const DEFAULT_FAILOVER_TIME_MS = 10_000; +const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; + +export type LocalitySubchannelAddress = SubchannelAddress & { + localityPath: string[]; +}; + +export function isLocalitySubchannelAddress( + address: SubchannelAddress +): address is LocalitySubchannelAddress { + return Array.isArray((address as LocalitySubchannelAddress).localityPath); +} + +interface PriorityChildBalancer { + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void; + exitIdle(): void; + resetBackoff(): void; + deactivate(): void; + maybeReactivate(): void; + cancelFailoverTimer(): void; + isFailoverTimerPending(): boolean; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + getName(): string; + destroy(): void; +} + +interface UpdateArgs { + subchannelAddress: SubchannelAddress[]; + lbConfig: LoadBalancingConfig; +} + +export class PriorityLoadBalancer implements LoadBalancer { + /** + * Inner class for holding a child priority and managing associated timers. + */ + private PriorityChildImpl = class implements PriorityChildBalancer { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + private failoverTimer: NodeJS.Timer | null = null; + private deactivationTimer: NodeJS.Timer | null = null; + constructor(private parent: PriorityLoadBalancer, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: ( + subchannelAddress: SubchannelAddress, + subchannelArgs: ChannelOptions + ) => { + return this.parent.channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + }, + }); + this.picker = new QueuePicker(this.childBalancer); + + this.deactivationTimer = setTimeout(() => {}, 0); + clearTimeout(this.deactivationTimer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.onChildStateChange(this); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + this.childBalancer.updateAddressList(addressList, lbConfig, attributes); + this.failoverTimer = setTimeout(() => { + this.failoverTimer = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + }, DEFAULT_FAILOVER_TIME_MS); + } + + exitIdle() { + this.childBalancer.exitIdle(); + } + + resetBackoff() { + this.childBalancer.resetBackoff(); + } + + deactivate() { + if (this.deactivationTimer === null) { + this.deactivationTimer = setTimeout(() => { + this.parent.deleteChild(this); + this.childBalancer.destroy(); + }, DEFAULT_RETENTION_INTERVAL_MS); + } + } + + maybeReactivate() { + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + this.deactivationTimer = null; + } + } + + cancelFailoverTimer() { + if (this.failoverTimer !== null) { + clearTimeout(this.failoverTimer); + this.failoverTimer = null; + } + } + + isFailoverTimerPending() { + return this.failoverTimer !== null; + } + + getConnectivityState() { + return this.connectivityState; + } + + getPicker() { + return this.picker; + } + + getName() { + return this.name; + } + + destroy() { + this.childBalancer.destroy(); + } + }; + // End of inner class PriorityChildImpl + + private children: Map = new Map< + string, + PriorityChildBalancer + >(); + /** + * The priority order of child names from the latest config update. + */ + private priorities: string[] = []; + /** + * The attributes object from the latest update, saved to be passed along to + * each new child as they are created + */ + private latestAttributes: { [key: string]: unknown } = {}; + /** + * The latest load balancing policies and address lists for each child from + * the latest update + */ + private latestUpdates: Map = new Map< + string, + UpdateArgs + >(); + /** + * Current chosen priority that requests are sent to + */ + private currentPriority: number | null = null; + /** + * After an update, this preserves the currently selected child from before + * the update. We continue to use that child until it disconnects, or + * another higher-priority child connects, or it is deleted because it is not + * in the new priority list at all and its retention interval has expired, or + * we try and fail to connect to every child in the new priority list. + */ + private currentChildFromBeforeUpdate: PriorityChildBalancer | null = null; + + constructor(private channelControlHelper: ChannelControlHelper) {} + + private onChildStateChange(child: PriorityChildBalancer) { + const childState = child.getConnectivityState(); + if (child === this.currentChildFromBeforeUpdate) { + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.channelControlHelper.updateState(childState, child.getPicker()); + } else { + this.currentChildFromBeforeUpdate = null; + this.tryNextPriority(true); + } + return; + } + const childPriority = this.priorities.indexOf(child.getName()); + if (childPriority < 0) { + // child is not in the priority list, ignore updates + return; + } + if (this.currentPriority !== null && childPriority > this.currentPriority) { + // child is lower priority than the currently selected child, ignore updates + return; + } + if (childState === ConnectivityState.TRANSIENT_FAILURE) { + /* Report connecting if and only if the currently selected child is the + * one entering TRANSIENT_FAILURE */ + this.tryNextPriority(childPriority === this.currentPriority); + return; + } + if (this.currentPriority === null || childPriority < this.currentPriority) { + /* In this case, either there is no currently selected child or this + * child is higher priority than the currently selected child, so we want + * to switch to it if it is READY or IDLE. */ + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.selectPriority(childPriority); + } + return; + } + /* The currently selected child has updated state to something other than + * TRANSIENT_FAILURE, so we pass that update along */ + this.channelControlHelper.updateState(childState, child.getPicker()); + } + + private deleteChild(child: PriorityChildBalancer) { + if (child === this.currentChildFromBeforeUpdate) { + this.currentChildFromBeforeUpdate = null; + /* If we get to this point, the currentChildFromBeforeUpdate was still in + * use, so we are still trying to connect to the specified priorities */ + this.tryNextPriority(true); + } + } + + /** + * Select the child at the specified priority, and report that child's state + * as this balancer's state until that child disconnects or a higher-priority + * child connects. + * @param priority + */ + private selectPriority(priority: number) { + this.currentPriority = priority; + const chosenChild = this.children.get(this.priorities[priority])!; + this.channelControlHelper.updateState( + chosenChild.getConnectivityState(), + chosenChild.getPicker() + ); + this.currentChildFromBeforeUpdate = null; + // Deactivate each child of lower priority than the chosen child + for (let i = priority + 1; i < this.priorities.length; i++) { + this.children.get(this.priorities[i])?.deactivate(); + } + } + + /** + * Check each child in priority order until we find one to use + * @param reportConnecting Whether we should report a CONNECTING state if we + * stop before picking a specific child. This should be true when we have + * not already selected a child. + */ + private tryNextPriority(reportConnecting: boolean) { + for (const [index, childName] of this.priorities.entries()) { + let child = this.children.get(childName); + /* If the child doesn't already exist, create it and update it. */ + if (child === undefined) { + if (reportConnecting) { + this.channelControlHelper.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); + } + child = new this.PriorityChildImpl(this, childName); + this.children.set(childName, child); + const childUpdate = this.latestUpdates.get(childName); + if (childUpdate !== undefined) { + child.updateAddressList( + childUpdate.subchannelAddress, + childUpdate.lbConfig, + this.latestAttributes + ); + } + } + /* We're going to try to use this child, so reactivate it if it has been + * deactivated */ + child.maybeReactivate(); + if ( + child.getConnectivityState() === ConnectivityState.READY || + child.getConnectivityState() === ConnectivityState.IDLE + ) { + this.selectPriority(index); + return; + } + if (child.isFailoverTimerPending()) { + /* This child is still trying to connect. Wait until its failover timer + * has ended to continue to the next one */ + if (reportConnecting) { + this.channelControlHelper.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); + } + return; + } + } + this.currentPriority = null; + this.currentChildFromBeforeUpdate = null; + this.channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'No ready priority', + metadata: new Metadata(), + }) + ); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isPriorityLoadBalancingConfig(lbConfig)) { + // Reject a config of the wrong type + return; + } + const priorityConfig = lbConfig.priority; + /* For each address, the first element of its localityPath array determines + * which child it belongs to. So we bucket those addresses by that first + * element, and pass along the rest of the localityPath for that child + * to use. */ + const childAddressMap: Map = new Map< + string, + LocalitySubchannelAddress[] + >(); + for (const address of addressList) { + if (!isLocalitySubchannelAddress(address)) { + // Reject address that cannot be prioritized + return; + } + if (address.localityPath.length < 1) { + // Reject address that cannot be prioritized + return; + } + const childName = address.localityPath[0]; + const childAddress: LocalitySubchannelAddress = { + ...address, + localityPath: address.localityPath.slice(1), + }; + let childAddressList = childAddressMap.get(childName); + if (childAddressList === undefined) { + childAddressList = []; + childAddressMap.set(childName, childAddressList); + } + childAddressList.push(childAddress); + } + if (this.currentPriority !== null) { + this.currentChildFromBeforeUpdate = this.children.get( + this.priorities[this.currentPriority] + )!; + this.currentPriority = null; + } + this.latestAttributes = attributes; + this.latestUpdates.clear(); + this.priorities = priorityConfig.priorities; + /* Pair up the new child configs with the corresponding address lists, and + * update all existing children with their new configs */ + for (const [childName, childConfig] of priorityConfig.children.entries()) { + const chosenChildConfig = getFirstUsableConfig(childConfig.config); + if (chosenChildConfig !== null) { + const childAddresses = childAddressMap.get(childName) ?? []; + this.latestUpdates.set(childName, { + subchannelAddress: childAddresses, + lbConfig: chosenChildConfig, + }); + const existingChild = this.children.get(childName); + if (existingChild !== undefined) { + existingChild.updateAddressList( + childAddresses, + chosenChildConfig, + attributes + ); + } + } + } + // Deactivate all children that are no longer in the priority list + for (const [childName, child] of this.children.entries()) { + if (this.priorities.indexOf(childName) < 0) { + child.deactivate(); + } + } + // Only report connecting if there are no existing children + this.tryNextPriority(this.children.size === 0); + } + exitIdle(): void { + if (this.currentPriority !== null) { + this.children.get(this.priorities[this.currentPriority])?.exitIdle(); + } + } + resetBackoff(): void { + for (const child of this.children.values()) { + child.resetBackoff(); + } + } + destroy(): void { + for (const child of this.children.values()) { + child.destroy(); + } + this.children.clear(); + this.currentChildFromBeforeUpdate?.destroy(); + this.currentChildFromBeforeUpdate = null; + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, PriorityLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index dc4cf4e3..2e91ffc5 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -22,6 +22,7 @@ import { Picker } from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; +import * as load_balancer_priority from './load-balancer-priority'; /** * A collection of functions associated with a channel that a load balancer @@ -137,4 +138,5 @@ export function getFirstUsableConfig( export function registerAll() { load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); + load_balancer_priority.setup(); } From be31009d9a084e7c586a763ae2582dc856f80766 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 21 May 2020 10:04:29 -0700 Subject: [PATCH 2/3] Start failover timers when leaving IDLE --- .../grpc-js/src/load-balancer-priority.ts | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts index 94f7f560..94a4ace7 100644 --- a/packages/grpc-js/src/load-balancer-priority.ts +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -111,22 +111,31 @@ export class PriorityLoadBalancer implements LoadBalancer { this.parent.onChildStateChange(this); } + private startFailoverTimer() { + if (this.failoverTimer === null) { + this.failoverTimer = setTimeout(() => { + this.failoverTimer = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + }, DEFAULT_FAILOVER_TIME_MS); + } + } + updateAddressList( addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown } ): void { this.childBalancer.updateAddressList(addressList, lbConfig, attributes); - this.failoverTimer = setTimeout(() => { - this.failoverTimer = null; - this.updateState( - ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker() - ); - }, DEFAULT_FAILOVER_TIME_MS); + this.startFailoverTimer(); } exitIdle() { + if (this.connectivityState === ConnectivityState.IDLE) { + this.startFailoverTimer(); + } this.childBalancer.exitIdle(); } @@ -215,6 +224,16 @@ export class PriorityLoadBalancer implements LoadBalancer { constructor(private channelControlHelper: ChannelControlHelper) {} + private updateState(state: ConnectivityState, picker: Picker) { + /* If switching to IDLE, use a QueuePicker attached to this load balancer + * so that when the picker calls exitIdle, that in turn calls exitIdle on + * the PriorityChildImpl, which will start the failover timer. */ + if (state === ConnectivityState.IDLE) { + picker = new QueuePicker(this); + } + this.channelControlHelper.updateState(state, picker); + } + private onChildStateChange(child: PriorityChildBalancer) { const childState = child.getConnectivityState(); if (child === this.currentChildFromBeforeUpdate) { @@ -222,7 +241,7 @@ export class PriorityLoadBalancer implements LoadBalancer { childState === ConnectivityState.READY || childState === ConnectivityState.IDLE ) { - this.channelControlHelper.updateState(childState, child.getPicker()); + this.updateState(childState, child.getPicker()); } else { this.currentChildFromBeforeUpdate = null; this.tryNextPriority(true); @@ -258,7 +277,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } /* The currently selected child has updated state to something other than * TRANSIENT_FAILURE, so we pass that update along */ - this.channelControlHelper.updateState(childState, child.getPicker()); + this.updateState(childState, child.getPicker()); } private deleteChild(child: PriorityChildBalancer) { @@ -279,7 +298,7 @@ export class PriorityLoadBalancer implements LoadBalancer { private selectPriority(priority: number) { this.currentPriority = priority; const chosenChild = this.children.get(this.priorities[priority])!; - this.channelControlHelper.updateState( + this.updateState( chosenChild.getConnectivityState(), chosenChild.getPicker() ); @@ -302,10 +321,7 @@ export class PriorityLoadBalancer implements LoadBalancer { /* If the child doesn't already exist, create it and update it. */ if (child === undefined) { if (reportConnecting) { - this.channelControlHelper.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } child = new this.PriorityChildImpl(this, childName); this.children.set(childName, child); @@ -332,17 +348,14 @@ export class PriorityLoadBalancer implements LoadBalancer { /* This child is still trying to connect. Wait until its failover timer * has ended to continue to the next one */ if (reportConnecting) { - this.channelControlHelper.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } return; } } this.currentPriority = null; this.currentChildFromBeforeUpdate = null; - this.channelControlHelper.updateState( + this.updateState( ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ code: Status.UNAVAILABLE, From a61dfb1527d028c6cf14114fe29ff10b3fa20eb4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 28 May 2020 11:52:02 -0700 Subject: [PATCH 3/3] Some cleanup and fixes --- packages/grpc-js/src/load-balancer-priority.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts index 94a4ace7..d3070715 100644 --- a/packages/grpc-js/src/load-balancer-priority.ts +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -100,9 +100,6 @@ export class PriorityLoadBalancer implements LoadBalancer { }, }); this.picker = new QueuePicker(this.childBalancer); - - this.deactivationTimer = setTimeout(() => {}, 0); - clearTimeout(this.deactivationTimer); } private updateState(connectivityState: ConnectivityState, picker: Picker) { @@ -415,7 +412,7 @@ export class PriorityLoadBalancer implements LoadBalancer { this.priorities = priorityConfig.priorities; /* Pair up the new child configs with the corresponding address lists, and * update all existing children with their new configs */ - for (const [childName, childConfig] of priorityConfig.children.entries()) { + for (const [childName, childConfig] of priorityConfig.children) { const chosenChildConfig = getFirstUsableConfig(childConfig.config); if (chosenChildConfig !== null) { const childAddresses = childAddressMap.get(childName) ?? []; @@ -434,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } } // Deactivate all children that are no longer in the priority list - for (const [childName, child] of this.children.entries()) { + for (const [childName, child] of this.children) { if (this.priorities.indexOf(childName) < 0) { child.deactivate(); }