diff --git a/packages/grpc-js-xds/gulpfile.ts b/packages/grpc-js-xds/gulpfile.ts index 6f17a402..2bb43a7f 100644 --- a/packages/grpc-js-xds/gulpfile.ts +++ b/packages/grpc-js-xds/gulpfile.ts @@ -63,6 +63,7 @@ const compile = checkTask(() => execNpmCommand('compile')); const runTests = checkTask(() => { process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true'; process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG = 'true'; + process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH = 'true'; return gulp.src(`${outDir}/test/**/*.js`) .pipe(mocha({reporter: 'mocha-jenkins-reporter', require: ['ts-node/register']})); diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts index 530bb256..85890311 100644 --- a/packages/grpc-js-xds/src/environment.ts +++ b/packages/grpc-js-xds/src/environment.ts @@ -20,3 +20,4 @@ export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENA export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true'; export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true'; export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true'; +export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'false') === 'true'; diff --git a/packages/grpc-js-xds/src/http-filter.ts b/packages/grpc-js-xds/src/http-filter.ts index 29ce5958..f8da5b82 100644 --- a/packages/grpc-js-xds/src/http-filter.ts +++ b/packages/grpc-js-xds/src/http-filter.ts @@ -116,7 +116,7 @@ export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean try { typeUrl = getTopLevelFilterUrl(encodedConfig); } catch (e) { - trace(httpFilter.name + ' validation failed with error ' + e.message); + trace(httpFilter.name + ' validation failed with error ' + (e as Error).message); return false; } const registryEntry = FILTER_REGISTRY.get(typeUrl); @@ -243,4 +243,4 @@ export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: Http } else { return null; } -} \ No newline at end of file +} diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 95c26a20..70aa5bef 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -23,6 +23,7 @@ import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager'; import * as xds_wrr_locality from './load-balancer-xds-wrr-locality'; +import * as ring_hash from './load-balancer-ring-hash'; import * as router_filter from './http-filter/router-filter'; import * as fault_injection_filter from './http-filter/fault-injection-filter'; import * as csds from './csds'; @@ -41,6 +42,7 @@ export function register() { load_balancer_weighted_target.setup(); load_balancer_xds_cluster_manager.setup(); xds_wrr_locality.setup(); + ring_hash.setup(); router_filter.setup(); fault_injection_filter.setup(); csds.setup(); diff --git a/packages/grpc-js-xds/src/load-balancer-priority.ts b/packages/grpc-js-xds/src/load-balancer-priority.ts index 4a4acb47..54e01fa8 100644 --- a/packages/grpc-js-xds/src/load-balancer-priority.ts +++ b/packages/grpc-js-xds/src/load-balancer-priority.ts @@ -41,9 +41,26 @@ const DEFAULT_FAILOVER_TIME_MS = 10_000; const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; export interface LocalityEndpoint extends Endpoint { + /** + * A sequence of strings that determines how to divide endpoints up in priority and + * weighted_target. + */ localityPath: string[]; + /** + * The locality this endpoint is in. Used in wrr_locality and xds_cluster_impl. + */ locality: Locality__Output; - weight: number; + /** + * The load balancing weight for the entire locality that contains this + * endpoint. Used in xds_wrr_locality. + */ + localityWeight: number; + /** + * The overall load balancing weight for this endpoint, calculated as the + * product of the load balancing weight for this endpoint within its locality + * and the load balancing weight of the locality. Used in ring_hash. + */ + endpointWeight: number; }; export function isLocalityEndpoint( @@ -317,7 +334,7 @@ export class PriorityLoadBalancer implements LoadBalancer { * so that when the picker calls exitIdle, that in turn calls exitIdle on * the PriorityChildImpl, which will start the failover timer. */ if (state === ConnectivityState.IDLE) { - picker = new QueuePicker(this); + picker = new QueuePicker(this, picker); } this.channelControlHelper.updateState(state, picker); } diff --git a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts new file mode 100644 index 00000000..a124d3b8 --- /dev/null +++ b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts @@ -0,0 +1,507 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, connectivityState, status, Metadata, ChannelOptions, LoadBalancingConfig } from '@grpc/grpc-js'; +import { isLocalityEndpoint } from './load-balancer-priority'; +import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; +import LeafLoadBalancer = experimental.LeafLoadBalancer; +import Endpoint = experimental.Endpoint; +import Picker = experimental.Picker; +import PickArgs = experimental.PickArgs; +import PickResult = experimental.PickResult; +import PickResultType = experimental.PickResultType; +import LoadBalancer = experimental.LoadBalancer; +import ChannelControlHelper = experimental.ChannelControlHelper; +import createChildChannelControlHelper = experimental.createChildChannelControlHelper; +import UnavailablePicker = experimental.UnavailablePicker; +import subchannelAddressToString = experimental.subchannelAddressToString; +import registerLoadBalancerType = experimental.registerLoadBalancerType; +import EndpointMap = experimental.EndpointMap; +import { loadXxhashApi, xxhashApi } from './xxhash'; +import { EXPERIMENTAL_RING_HASH } from './environment'; +import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; +import { RingHash__Output } from './generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash'; +import { Any__Output } from './generated/google/protobuf/Any'; +import { TypedExtensionConfig__Output } from './generated/envoy/config/core/v3/TypedExtensionConfig'; +import { LoadBalancingPolicy__Output } from './generated/envoy/config/cluster/v3/LoadBalancingPolicy'; +import { registerLbPolicy } from './lb-policy-registry'; + +const TRACER_NAME = 'ring_hash'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const TYPE_NAME = 'ring_hash'; + +const DEFAULT_MIN_RING_SIZE = 1024; +const DEFAULT_MAX_RING_SIZE = 4096; +const ABSOLUTE_MAX_RING_SIZE = 8_388_608; +const DEFAULT_RING_SIZE_CAP = 4096; + +class RingHashLoadBalancingConfig implements TypedLoadBalancingConfig { + private minRingSize: number; + private maxRingSize: number; + constructor(minRingSize?: number, maxRingSize?: number) { + this.minRingSize = Math.min( + minRingSize ?? DEFAULT_MIN_RING_SIZE, + ABSOLUTE_MAX_RING_SIZE + ); + this.maxRingSize = Math.min( + maxRingSize ?? DEFAULT_MAX_RING_SIZE, + ABSOLUTE_MAX_RING_SIZE + ); + } + getLoadBalancerName(): string { + return TYPE_NAME; + } + toJsonObject(): object { + return { + [TYPE_NAME]: { + min_ring_size: this.minRingSize, + max_ring_size: this.maxRingSize, + } + }; + } + getMinRingSize() { + return this.minRingSize; + } + getMaxRingSize() { + return this.maxRingSize; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + static createFromJson(obj: any): TypedLoadBalancingConfig { + if ('min_ring_size' in obj) { + if (typeof obj.min_ring_size === 'number') { + if (obj.min_ring_size > ABSOLUTE_MAX_RING_SIZE) { + throw new Error(`ring_hash config field min_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.min_ring_size}`); + } + } else { + throw new Error( + 'ring_hash config field min_ring_size must be a number if provided' + ); + } + } + if ('max_ring_size' in obj) { + if (typeof obj.max_ring_size === 'number') { + if (obj.max_ring_size > ABSOLUTE_MAX_RING_SIZE) { + throw new Error(`ring_hash config field max_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.max_ring_size}`); + } + } else { + throw new Error( + 'ring_hash config field max_ring_size must be a number if provided' + ); + } + } + return new RingHashLoadBalancingConfig( + obj.min_ring_size, + obj.max_ring_size + ); + } +} + +interface RingEntry { + leafBalancer: LeafLoadBalancer; + hash: bigint; +} + +interface EndpointWeight { + endpoint: Endpoint; + weight: number; + normalizedWeight: number; +} + +class RingHashPicker implements Picker { + constructor(private ring: RingEntry[]) {} + /** + * Find the least index in the ring with a hash greater than or equal to the + * hash parameter, or 0 if no such index exists. + * @param hash + */ + private findIndexForHash(hash: bigint): number { + // Binary search to find the target index + let low = 0; + let high = this.ring.length; + let index = 0; + while (low <= high) { + /* Commonly in binary search, this operation can overflow and result in + * the wrong value. However, in this case the ring size is absolutely + * limtied to 1<<23, so low+high < MAX_SAFE_INTEGER */ + index = Math.floor((low + high) / 2); + if (index === this.ring.length) { + index = 0; + break; + } + const midval = this.ring[index].hash; + const midval1 = index === 0 ? 0n : this.ring[index - 1].hash; + if (hash <= midval && hash > midval1) { + break; + } + if (midval < hash) { + low = index + 1; + } else { + high = index - 1; + } + if (low > high) { + index = 0; + break; + } + } + return index; + } + pick(pickArgs: PickArgs): PickResult { + trace('Pick called. Hash=' + pickArgs.extraPickInfo.hash); + const firstIndex = this.findIndexForHash( + BigInt(pickArgs.extraPickInfo.hash) + ); + for (let i = 0; i < this.ring.length; i++) { + const index = (firstIndex + i) % this.ring.length; + const entryState = this.ring[index].leafBalancer.getConnectivityState(); + if (entryState === connectivityState.READY) { + return this.ring[index].leafBalancer.getPicker().pick(pickArgs); + } + if (entryState === connectivityState.IDLE) { + this.ring[index].leafBalancer.startConnecting(); + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } + if (entryState === connectivityState.CONNECTING) { + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } + } + return { + pickResultType: PickResultType.TRANSIENT_FAILURE, + status: { + code: status.UNAVAILABLE, + details: + 'ring_hash: invalid state: all child balancers in TRANSIENT_FAILURE', + metadata: new Metadata(), + }, + subchannel: null, + onCallStarted: null, + onCallEnded: null, + }; + } +} + +class RingHashLoadBalancer implements LoadBalancer { + /** + * Tracks endpoint repetition across address updates, to use an appropriate + * existing leaf load balancer for the same endpoint when possible. + */ + private leafMap = new EndpointMap(); + /** + * Tracks endpoints from a single address update, with their associated + * weights aggregated from all weights associated with that endpoint in that + * update. + */ + private leafWeightMap = new EndpointMap(); + private childChannelControlHelper: ChannelControlHelper; + private updatesPaused = false; + private currentState: connectivityState = connectivityState.IDLE; + private ring: RingEntry[] = []; + private ringHashSizeCap = DEFAULT_RING_SIZE_CAP; + constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) { + this.childChannelControlHelper = createChildChannelControlHelper( + channelControlHelper, + { + updateState: (state, picker) => { + this.calculateAndUpdateState(); + /* If this LB policy is in the TRANSIENT_FAILURE state, requests will + * not trigger new connections, so we need to explicitly try connecting + * to other endpoints that are currently IDLE to try to eventually + * connect to something. */ + if ( + state === connectivityState.TRANSIENT_FAILURE && + this.currentState === connectivityState.TRANSIENT_FAILURE + ) { + for (const leaf of this.leafMap.values()) { + const leafState = leaf.getConnectivityState(); + if (leafState === connectivityState.CONNECTING) { + break; + } + if (leafState === connectivityState.IDLE) { + leaf.startConnecting(); + break; + } + } + } + }, + } + ); + if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) { + this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap']; + } + } + + private calculateAndUpdateState() { + if (this.updatesPaused) { + return; + } + const stateCounts = { + [connectivityState.READY]: 0, + [connectivityState.TRANSIENT_FAILURE]: 0, + [connectivityState.CONNECTING]: 0, + [connectivityState.IDLE]: 0, + [connectivityState.SHUTDOWN]: 0, + }; + for (const leaf of this.leafMap.values()) { + stateCounts[leaf.getConnectivityState()] += 1; + } + if (stateCounts[connectivityState.READY] > 0) { + this.updateState(connectivityState.READY, new RingHashPicker(this.ring)); + // REPORT READY + } else if (stateCounts[connectivityState.TRANSIENT_FAILURE] > 1) { + this.updateState( + connectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + } else if (stateCounts[connectivityState.CONNECTING] > 0) { + this.updateState( + connectivityState.CONNECTING, + new RingHashPicker(this.ring) + ); + } else if ( + stateCounts[connectivityState.TRANSIENT_FAILURE] > 0 && + this.leafMap.size > 1 + ) { + this.updateState( + connectivityState.CONNECTING, + new RingHashPicker(this.ring) + ); + } else if (stateCounts[connectivityState.IDLE] > 0) { + this.updateState(connectivityState.IDLE, new RingHashPicker(this.ring)); + } else { + this.updateState( + connectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + } + } + + private updateState(newState: connectivityState, picker: Picker) { + trace( + connectivityState[this.currentState] + + ' -> ' + + connectivityState[newState] + ); + this.currentState = newState; + this.channelControlHelper.updateState(newState, picker); + } + + private constructRing( + endpointList: Endpoint[], + config: RingHashLoadBalancingConfig + ) { + this.ring = []; + const endpointWeights: EndpointWeight[] = []; + let weightSum = 0; + for (const endpoint of endpointList) { + const weight = this.leafWeightMap.get(endpoint) ?? 1; + endpointWeights.push({ endpoint, weight, normalizedWeight: 0 }); + weightSum += weight; + } + /* The normalized weights sum to 1, with some small potential error due to + * the limitation of floating point precision. */ + let minNormalizedWeight = 1; + for (const endpointWeight of endpointWeights) { + endpointWeight.normalizedWeight = endpointWeight.weight / weightSum; + minNormalizedWeight = Math.min( + endpointWeight.normalizedWeight, + minNormalizedWeight + ); + } + const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap); + const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap); + /* Calculate a scale factor that meets the following conditions: + * 1. The result is between minRingSize and maxRingSize, inclusive + * 2. The smallest normalized weight is scaled to a whole number, if it + * does not violate the previous condition. + * The size of the ring is ceil(scale) + */ + const scale = Math.min( + Math.ceil(minNormalizedWeight * minRingSize) / minNormalizedWeight, + maxRingSize + ); + trace('Creating a ring with size ' + Math.ceil(scale)); + /* For each endpoint, create a number of entries proportional to its + * weight, such that the total number of entries is equal to ceil(scale). + */ + let currentHashes = 0; + let targetHashes = 0; + for (const endpointWeight of endpointWeights) { + const addressString = subchannelAddressToString( + endpointWeight.endpoint.addresses[0] + ); + targetHashes += scale * endpointWeight.normalizedWeight; + const leafBalancer = this.leafMap.get(endpointWeight.endpoint); + if (!leafBalancer) { + throw new Error( + 'ring_hash: Invalid state: endpoint found in leafWeightMap but not in leafMap' + ); + } + let count = 0; + while (currentHashes < targetHashes) { + const hashKey = `${addressString}_${count}`; + const hash = xxhashApi!.h64(hashKey, 0n); + this.ring.push({ hash, leafBalancer }); + currentHashes++; + count++; + } + } + /* The ring is sorted by the hash so that it can be efficiently searched + * for a hash that is closest to any arbitrary hash. */ + this.ring.sort((a, b) => { + if (a.hash > b.hash) { + return 1; + } else if (a.hash < b.hash) { + return -1; + } else { + return 0; + } + }); + } + + updateAddressList( + endpointList: Endpoint[], + lbConfig: TypedLoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!(lbConfig instanceof RingHashLoadBalancingConfig)) { + trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + return; + } + trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + this.updatesPaused = true; + this.leafWeightMap.clear(); + const dedupedEndpointList: Endpoint[] = []; + for (const endpoint of endpointList) { + const leafBalancer = this.leafMap.get(endpoint); + if (leafBalancer) { + leafBalancer.updateEndpoint(endpoint); + } else { + this.leafMap.set( + endpoint, + new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options) + ); + } + const weight = this.leafWeightMap.get(endpoint); + if (weight === undefined) { + dedupedEndpointList.push(endpoint); + } + this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1)); + } + const removedLeaves = this.leafMap.deleteMissing(endpointList); + for (const leaf of removedLeaves) { + leaf.destroy(); + } + loadXxhashApi().then(() => { + this.constructRing(dedupedEndpointList, lbConfig); + this.updatesPaused = false; + this.calculateAndUpdateState(); + }); + } + exitIdle(): void { + /* This operation does not make sense here. We don't want to make the whole + * balancer exit idle, and instead propagate that to individual chlidren as + * relevant. */ + } + resetBackoff(): void { + // There is no backoff to reset here + } + destroy(): void { + this.ring = []; + for (const child of this.leafMap.values()) { + child.destroy(); + } + this.leafMap.clear(); + this.leafWeightMap.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +const RING_HASH_TYPE_URL = 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash'; + +const resourceRoot = loadProtosWithOptionsSync([ + 'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto'], { + keepCase: true, + includeDirs: [ + // Paths are relative to src/build + __dirname + '/../../deps/envoy-api/', + __dirname + '/../../deps/xds/', + __dirname + '/../../deps/protoc-gen-validate' + ], + } +); + +const toObjectOptions = { + longs: String, + enums: String, + defaults: true, + oneofs: true +} + +function decodeRingHash(message: Any__Output): RingHash__Output { + const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1); + const type = resourceRoot.lookup(name); + if (type) { + const decodedMessage = (type as any).decode(message.value); + return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as RingHash__Output; + } else { + throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`); + } +} + +function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig { + if (protoPolicy.typed_config?.type_url !== RING_HASH_TYPE_URL) { + throw new Error(`Ring Hash LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`); + } + const ringHashMessage = decodeRingHash(protoPolicy.typed_config); + if (ringHashMessage.hash_function !== 'XX_HASH') { + throw new Error(`Ring Hash LB policy parsing error: unexpected hash function ${ringHashMessage.hash_function}`); + } + return { + [TYPE_NAME]: { + min_ring_size: ringHashMessage.minimum_ring_size?.value ?? 1024, + max_ring_size: ringHashMessage.maximum_ring_size?.value ?? 8_388_608 + } + }; +} + +export function setup() { + if (EXPERIMENTAL_RING_HASH) { + registerLoadBalancerType( + TYPE_NAME, + RingHashLoadBalancer, + RingHashLoadBalancingConfig + ); + registerLbPolicy(RING_HASH_TYPE_URL, convertToLoadBalancingPolicy); + } +} diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index 3560d848..99059dca 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -204,35 +204,7 @@ class XdsClusterManager implements LoadBalancer { } else { connectivityState = ConnectivityState.TRANSIENT_FAILURE; } - /* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is - * exactly one corresponding picker, so if the state is one of those and - * that does not change, no new information is provided by passing the - * new state upward. */ - if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) { - return; - } - let picker: Picker; - - switch (connectivityState) { - case ConnectivityState.READY: - picker = new XdsClusterManagerPicker(pickerMap); - break; - case ConnectivityState.CONNECTING: - case ConnectivityState.IDLE: - picker = new QueuePicker(this); - break; - default: - picker = new UnavailablePicker({ - code: Status.UNAVAILABLE, - details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE', - metadata: new Metadata() - }); - } - trace( - 'Transitioning to ' + - ConnectivityState[connectivityState] - ); - this.channelControlHelper.updateState(connectivityState, picker); + this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap)); } updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts index c4bf984f..29c0b6f3 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts @@ -115,10 +115,15 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig } } +interface WeightedEndpoint { + endpoint: Endpoint; + weight: number; +} + interface LocalityEntry { locality: Locality__Output; weight: number; - endpoints: Endpoint[]; + endpoints: WeightedEndpoint[]; } interface PriorityEntry { @@ -166,16 +171,19 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt if (!endpoint.load_balancing_weight) { continue; } - const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( + const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( (lbEndpoint) => { /* The validator in the XdsClient class ensures that each endpoint has * a socket_address with an IP address and a port_value. */ const socketAddress = lbEndpoint.endpoint!.address!.socket_address!; return { - addresses: [{ - host: socketAddress.address!, - port: socketAddress.port_value!, - }] + endpoint: { + addresses: [{ + host: socketAddress.address!, + port: socketAddress.port_value!, + }] + }, + weight: lbEndpoint.load_balancing_weight?.value ?? 1 }; } ); @@ -211,7 +219,7 @@ function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] { sub_zone: '' }, weight: 1, - endpoints: endpoints + endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1})) }], dropCategories: [] }]; @@ -295,15 +303,16 @@ export class XdsClusterResolver implements LoadBalancer { newPriorityNames[priority] = newPriorityName; for (const localityObj of priorityEntry.localities) { - for (const endpoint of localityObj.endpoints) { + for (const weightedEndpoint of localityObj.endpoints) { endpointList.push({ localityPath: [ newPriorityName, localityToName(localityObj.locality), ], locality: localityObj.locality, - weight: localityObj.weight, - ...endpoint + localityWeight: localityObj.weight, + endpointWeight: localityObj.weight * weightedEndpoint.weight, + ...weightedEndpoint.endpoint }); } newLocalityPriorities.set(localityToName(localityObj.locality), priority); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index f3fbcd51..3fb57d6e 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -90,7 +90,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { if (!(localityName in targets)) { targets[localityName] = { child_policy: lbConfig.getChildPolicy(), - weight: address.weight + weight: address.localityWeight }; } } diff --git a/packages/grpc-js-xds/src/matcher.ts b/packages/grpc-js-xds/src/matcher.ts index 148df7f8..b657d32c 100644 --- a/packages/grpc-js-xds/src/matcher.ts +++ b/packages/grpc-js-xds/src/matcher.ts @@ -71,7 +71,7 @@ export class SafeRegexValueMatcher implements ValueMatcher { const numberRegex = new RE2(/^-?\d+$/u); export class RangeValueMatcher implements ValueMatcher { - constructor(private start: BigInt, private end: BigInt) {} + constructor(private start: bigint, private end: bigint) {} apply(value: string) { if (!numberRegex.test(value)) { @@ -264,4 +264,4 @@ export class FullMatcher implements Matcher { headers: ${this.headerMatchers.map(matcher => matcher.toString()).join('\n\t')} fraction: ${this.fraction ? fractionToString(this.fraction): 'none'}`; } -} \ No newline at end of file +} diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index cd830d52..3acdd232 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -34,18 +34,19 @@ import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderM import ConfigSelector = experimental.ConfigSelector; import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher'; import { envoyFractionToFraction, Fraction } from "./fraction"; -import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; +import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources'; import Duration = experimental.Duration; import { Duration__Output } from './generated/google/protobuf/Duration'; import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter'; -import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY } from './environment'; +import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY, EXPERIMENTAL_RING_HASH } from './environment'; import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; import { BootstrapInfo, loadBootstrapInfo, validateBootstrapConfig } from './xds-bootstrap'; import { ListenerResourceType } from './xds-resource-type/listener-resource-type'; import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type'; import { protoDurationToDuration } from './duration'; +import { loadXxhashApi } from './xxhash'; const TRACER_NAME = 'xds_resolver'; @@ -381,7 +382,11 @@ class XdsResolver implements Resolver { } } - private handleRouteConfig(routeConfig: RouteConfiguration__Output) { + private async handleRouteConfig(routeConfig: RouteConfiguration__Output) { + /* We need to load the xxhash API before this function finishes, because + * it is invoked in the config selector, which can be called immediately + * after this function returns. */ + await loadXxhashApi(); this.latestRouteConfig = routeConfig; /* Select the virtual host using the default authority override if it * exists, and the channel target otherwise. */ @@ -456,6 +461,26 @@ class XdsResolver implements Resolver { } } } + const hashPolicies: HashPolicy[] = []; + if (EXPERIMENTAL_RING_HASH) { + for (const routeHashPolicy of route.route!.hash_policy) { + if (routeHashPolicy.policy_specifier === 'header') { + const headerPolicy = routeHashPolicy.header!; + hashPolicies.push({ + type: 'HEADER', + terminal: routeHashPolicy.terminal, + headerName: headerPolicy.header_name, + regex: headerPolicy.regex_rewrite?.pattern ? new RE2(headerPolicy.regex_rewrite.pattern.regex, 'ug') : undefined, + regexSubstitution: headerPolicy.regex_rewrite?.substitution + }); + } else if (routeHashPolicy.policy_specifier === 'filter_state' && routeHashPolicy.filter_state!.key === 'io.grpc.channel_id') { + hashPolicies.push({ + type: 'CHANNEL_ID', + terminal: routeHashPolicy.terminal + }); + } + } + } switch (route.route!.cluster_specifier) { case 'cluster_header': continue; @@ -483,7 +508,7 @@ class XdsResolver implements Resolver { } } } - routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories); + routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories, hashPolicies); break; } case 'weighted_clusters': { @@ -525,7 +550,7 @@ class XdsResolver implements Resolver { } weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories}); } - routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}); + routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}, hashPolicies); break; } default: @@ -554,7 +579,7 @@ class XdsResolver implements Resolver { this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0}); } } - const configSelector: ConfigSelector = (methodName, metadata) => { + const configSelector: ConfigSelector = (methodName, metadata, channelId) => { for (const {matcher, action} of matchList) { if (matcher.apply(methodName, metadata)) { const clusterResult = action.getCluster(); @@ -562,10 +587,11 @@ class XdsResolver implements Resolver { const onCommitted = () => { this.unrefCluster(clusterResult.name); } + const hash = action.getHash(metadata, channelId); return { methodConfig: clusterResult.methodConfig, onCommitted: onCommitted, - pickInformation: {cluster: clusterResult.name}, + pickInformation: {cluster: clusterResult.name, hash: `${hash}`}, status: status.OK, dynamicFilterFactories: clusterResult.dynamicFilterFactories }; @@ -573,8 +599,8 @@ class XdsResolver implements Resolver { } return { methodConfig: {name: []}, - // cluster won't be used here, but it's set because of some TypeScript weirdness - pickInformation: {cluster: ''}, + // These fields won't be used here, but they're set because of some TypeScript weirdness + pickInformation: {cluster: '', hash: ''}, status: status.UNAVAILABLE, dynamicFilterFactories: [] }; diff --git a/packages/grpc-js-xds/src/route-action.ts b/packages/grpc-js-xds/src/route-action.ts index 83530f1b..2f87fee9 100644 --- a/packages/grpc-js-xds/src/route-action.ts +++ b/packages/grpc-js-xds/src/route-action.ts @@ -14,10 +14,12 @@ * limitations under the License. */ -import { MethodConfig, experimental } from '@grpc/grpc-js'; +import { Metadata, MethodConfig, experimental } from '@grpc/grpc-js'; import Duration = experimental.Duration; import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; +import { RE2 } from 're2-wasm'; +import { xxhashApi } from './xxhash'; export interface ClusterResult { name: string; @@ -28,6 +30,7 @@ export interface ClusterResult { export interface RouteAction { toString(): string; getCluster(): ClusterResult; + getHash(metadata: Metadata, channelId: number): bigint; } function durationToLogString(duration: Duration) { @@ -39,8 +42,83 @@ function durationToLogString(duration: Duration) { } } +export interface HashPolicy { + type: 'HEADER' | 'CHANNEL_ID'; + terminal: boolean; + headerName?: string; + regex?: RE2; + regexSubstitution?: string; +} + +/** + * Must be called only after xxhash.loadXxhashApi() resolves. + * @param hashPolicies + * @param metadata + * @param channelId + */ +function getHash(hashPolicies: HashPolicy[], metadata: Metadata, channelId: number): bigint { + let hash: bigint | null = null; + for (const policy of hashPolicies) { + let newHash: bigint | null = null; + switch (policy.type) { + case 'CHANNEL_ID': + newHash = xxhashApi!.h64(`${channelId}`, 0n); + break; + case 'HEADER': { + if (!policy.headerName) { + break; + } + if (policy.headerName.endsWith('-bin')) { + break; + } + let headerString: string; + if (policy.headerName === 'content-type') { + headerString = 'application/grpc'; + } else { + const headerValues = metadata.get(policy.headerName); + if (headerValues.length === 0) { + break; + } + headerString = headerValues.join(','); + } + let rewrittenHeaderString = headerString; + if (policy.regex && policy.regexSubstitution) { + /* The JS string replace method uses $-prefixed patterns to produce + * other strings. See + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace#specifying_a_string_as_the_replacement + * RE2-based regex substitutions use \n where n is a number to refer + * to capture group n, and they otherwise have no special replacement + * patterns. See + * https://github.com/envoyproxy/envoy/blob/2443032526cf6e50d63d35770df9473dd0460fc0/api/envoy/type/matcher/v3/regex.proto#L79-L87 + * We convert an RE2 regex substitution into a string substitution by + * first replacing each "$" with "$$" (which produces "$" in the + * output), and then replace each "\n" for any whole number n with + * "$n". */ + const regexSubstitution = policy.regexSubstitution.replace(/\$/g, '$$$$').replace(/\\(\d+)/g, '$$$1'); + rewrittenHeaderString = headerString.replace(policy.regex, regexSubstitution); + } + newHash = xxhashApi!.h64(rewrittenHeaderString, 0n); + break; + } + } + if (hash === null) { + hash = newHash; + } else if (newHash !== null) { + hash = ((hash << 1n) | (hash >> 63n)) ^ newHash; + } + if (policy.terminal && hash !== null) { + break; + } + } + if (hash === null) { + return xxhashApi!.h64(`${Math.random()}`, 0n); + } else { + return hash; + } +} + export class SingleClusterRouteAction implements RouteAction { - constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory[]) {} + constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory[], private hashPolicies: HashPolicy[]) {} getCluster() { return { @@ -50,6 +128,10 @@ export class SingleClusterRouteAction implements RouteAction { }; } + getHash(metadata: Metadata, channelId: number): bigint { + return getHash(this.hashPolicies, metadata, channelId); + } + toString() { return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')'; } @@ -72,7 +154,7 @@ export class WeightedClusterRouteAction implements RouteAction { * The weighted cluster choices represented as a CDF */ private clusterChoices: ClusterChoice[]; - constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) { + constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig, private hashPolicies: HashPolicy[]) { this.clusterChoices = []; let lastNumerator = 0; for (const clusterWeight of clusters) { @@ -96,6 +178,10 @@ export class WeightedClusterRouteAction implements RouteAction { return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []}; } + getHash(metadata: Metadata, channelId: number): bigint { + return getHash(this.hashPolicies, metadata, channelId); + } + toString() { const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ') return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')'; diff --git a/packages/grpc-js-xds/src/xds-bootstrap.ts b/packages/grpc-js-xds/src/xds-bootstrap.ts index fccd3edc..536439dd 100644 --- a/packages/grpc-js-xds/src/xds-bootstrap.ts +++ b/packages/grpc-js-xds/src/xds-bootstrap.ts @@ -357,14 +357,14 @@ export function loadBootstrapInfo(): BootstrapInfo { try { rawBootstrap = fs.readFileSync(bootstrapPath, { encoding: 'utf8'}); } catch (e) { - throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${e.message}`); + throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${(e as Error).message}`); } try { const parsedFile = JSON.parse(rawBootstrap); loadedBootstrapInfo = validateBootstrapConfig(parsedFile); return loadedBootstrapInfo; } catch (e) { - throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`) + throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${(e as Error).message}`) } } @@ -383,7 +383,7 @@ export function loadBootstrapInfo(): BootstrapInfo { loadedBootstrapInfo = validateBootstrapConfig(parsedConfig); } catch (e) { throw new Error( - `Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${e.message}` + `Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${(e as Error).message}` ); } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 464e2659..baabf2e7 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -208,14 +208,14 @@ class AdsResponseParser { try { decodeResult = this.result.type.decode(decodeContext, resource); } catch (e) { - this.result.errors.push(`${errorPrefix} ${e.message}`); + this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } let parsedName: XdsResourceName; try { parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl()); } catch (e) { - this.result.errors.push(`${errorPrefix} ${e.message}`); + this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen(); diff --git a/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts b/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts index 1e8ea41f..c13a91d7 100644 --- a/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts +++ b/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts @@ -21,7 +21,7 @@ import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js"; import { XdsServerConfig } from "../xds-bootstrap"; import { Duration__Output } from "../generated/google/protobuf/Duration"; import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection"; -import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION } from "../environment"; +import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION, EXPERIMENTAL_RING_HASH } from "../environment"; import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster"; import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value"; import { Any__Output } from "../generated/google/protobuf/Any"; @@ -150,6 +150,27 @@ export class ClusterResourceType extends XdsResourceType { child_policy: [{round_robin: {}}] } }; + } else if(EXPERIMENTAL_RING_HASH && message.lb_policy === 'RING_HASH') { + if (!message.ring_hash_lb_config) { + return null; + } + if (message.ring_hash_lb_config.hash_function !== 'XX_HASH') { + return null; + } + const minRingSize = message.ring_hash_lb_config.minimum_ring_size ? Number(message.ring_hash_lb_config.minimum_ring_size.value) : 1024; + if (minRingSize > 8_388_608) { + return null; + } + const maxRingSize = message.ring_hash_lb_config.maximum_ring_size ? Number(message.ring_hash_lb_config.maximum_ring_size.value) : 8_388_608; + if (maxRingSize > 8_388_608) { + return null; + } + lbPolicyConfig = { + ring_hash: { + min_ring_size: minRingSize, + max_ring_size: maxRingSize + } + }; } else { return null; } diff --git a/packages/grpc-js-xds/src/xxhash.ts b/packages/grpc-js-xds/src/xxhash.ts new file mode 100644 index 00000000..63f68af2 --- /dev/null +++ b/packages/grpc-js-xds/src/xxhash.ts @@ -0,0 +1,31 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* The simpler `import xxhash from 'xxhash-wasm';` doesn't compile correctly + * to CommonJS require calls for some reason, so we use this import to get + * the type, and then an explicit require call to get the actual value. */ +import xxhashImport from 'xxhash-wasm'; +const xxhash: typeof xxhashImport = require('xxhash-wasm'); + +export let xxhashApi: Awaited> | null = null; + +export async function loadXxhashApi() { + if (!xxhashApi) { + xxhashApi = await xxhash(); + } + return xxhashApi; +} diff --git a/packages/grpc-js-xds/test/framework.ts b/packages/grpc-js-xds/test/framework.ts index f89a4680..d38437a1 100644 --- a/packages/grpc-js-xds/test/framework.ts +++ b/packages/grpc-js-xds/test/framework.ts @@ -70,7 +70,7 @@ export interface FakeCluster { } export class FakeEdsCluster implements FakeCluster { - constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any) {} + constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any | 'RING_HASH') {} getEndpointConfig(): ClusterLoadAssignment { return { @@ -94,7 +94,12 @@ export class FakeEdsCluster implements FakeCluster { ] } }; - if (this.loadBalancingPolicyOverride) { + if (this.loadBalancingPolicyOverride === 'RING_HASH') { + result.lb_policy = 'RING_HASH'; + result.ring_hash_lb_config = { + hash_function: 'XX_HASH' + }; + } else if (this.loadBalancingPolicyOverride) { result.load_balancing_policy = { policies: [ { @@ -257,8 +262,14 @@ function createRouteConfig(route: FakeRoute): Route { prefix: '' }, route: { - cluster: route.cluster.getName() - } + cluster: route.cluster.getName(), + // Default to consistent hash + hash_policy: [{ + filter_state: { + key: 'io.grpc.channel_id' + } + }] + }, }; } else { return { @@ -271,7 +282,13 @@ function createRouteConfig(route: FakeRoute): Route { name: clusterWeight.cluster.getName(), weight: {value: clusterWeight.weight} })) - } + }, + // Default to consistent hash + hash_policy: [{ + filter_state: { + key: 'io.grpc.channel_id' + } + }] } } } diff --git a/packages/grpc-js-xds/test/test-confg-parsing.ts b/packages/grpc-js-xds/test/test-confg-parsing.ts index 740934ba..f2065805 100644 --- a/packages/grpc-js-xds/test/test-confg-parsing.ts +++ b/packages/grpc-js-xds/test/test-confg-parsing.ts @@ -311,6 +311,37 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { } } } + ], + ring_hash: [ + { + name: 'empty config', + input: {}, + output: { + min_ring_size: 1024, + max_ring_size: 4096 + } + }, + { + name: 'populated config', + input: { + min_ring_size: 2048, + max_ring_size: 8192 + } + }, + { + name: 'min_ring_size too large', + input: { + min_ring_size: 8_388_609 + }, + error: /min_ring_size/ + }, + { + name: 'max_ring_size too large', + input: { + max_ring_size: 8_388_609 + }, + error: /max_ring_size/ + } ] } diff --git a/packages/grpc-js-xds/test/test-ring-hash.ts b/packages/grpc-js-xds/test/test-ring-hash.ts new file mode 100644 index 00000000..0a9e893e --- /dev/null +++ b/packages/grpc-js-xds/test/test-ring-hash.ts @@ -0,0 +1,108 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Backend } from "./backend"; +import { XdsTestClient } from "./client"; +import { FakeEdsCluster, FakeRouteGroup } from "./framework"; +import { XdsServer } from "./xds-server"; + +import { register } from "../src"; +import assert = require("assert"); +import { Any } from "../src/generated/google/protobuf/Any"; +import { AnyExtension } from "@grpc/proto-loader"; +import { RingHash } from "../src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash"; + +register(); + +describe('Ring hash LB policy', () => { + let xdsServer: XdsServer; + let client: XdsTestClient; + beforeEach(done => { + xdsServer = new XdsServer(); + xdsServer.startServer(error => { + done(error); + }); + }); + afterEach(() => { + client?.close(); + xdsServer?.shutdownServer(); + }); + it('Should route requests to the single backend with the old lbPolicy field', done => { + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], 'RING_HASH'); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendOneCall(done); + }, reason => done(reason)); + }); + it('Should route requests to the single backend with the new load_balancing_policy field', done => { + const lbPolicy: AnyExtension & RingHash = { + '@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash', + hash_function: 'XX_HASH' + }; + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendOneCall(done); + }, reason => done(reason)); + }); + it('Should route all identical requests to the same backend', done => { + const backend1 = new Backend(); + const backend2 = new Backend() + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], 'RING_HASH'); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendNCalls(10, error => { + assert.ifError(error); + assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0)); + done(); + }) + }, reason => done(reason)); + }); +}) diff --git a/packages/grpc-js-xds/test/xds-server.ts b/packages/grpc-js-xds/test/xds-server.ts index 6f021c17..2aa62bdd 100644 --- a/packages/grpc-js-xds/test/xds-server.ts +++ b/packages/grpc-js-xds/test/xds-server.ts @@ -44,6 +44,7 @@ const loadedProtos = loadPackageDefinition(loadSync( 'envoy/extensions/clusters/aggregate/v3/cluster.proto', 'envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto', 'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto', + 'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto', 'xds/type/v3/typed_struct.proto' ], { diff --git a/packages/grpc-js-xds/tsconfig.json b/packages/grpc-js-xds/tsconfig.json index c121a5f6..24212dfc 100644 --- a/packages/grpc-js-xds/tsconfig.json +++ b/packages/grpc-js-xds/tsconfig.json @@ -3,8 +3,8 @@ "compilerOptions": { "rootDir": ".", "outDir": "build", - "target": "es2017", - "lib": ["es2017"], + "target": "es2020", + "lib": ["es2020"], "module": "commonjs", "incremental": true }, diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index f2bb8bcf..aa1e6c83 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -61,6 +61,7 @@ export interface ChannelOptions { * Set the enableTrace option in TLS clients and servers */ 'grpc-node.tls_enable_trace'?: number; + 'grpc.lb.ring_hash.ring_size_cap'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; } @@ -96,6 +97,7 @@ export const recognizedOptions = { 'grpc.service_config_disable_resolution': true, 'grpc.client_idle_timeout_ms': true, 'grpc-node.tls_enable_trace': true, + 'grpc.lb.ring_hash.ring_size_cap': true, }; export function channelOptionsEqual( diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 870fbe28..1e7a1e14 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -26,6 +26,7 @@ export { Endpoint, endpointToString, endpointHasAddress, + EndpointMap, } from './subchannel-address'; export { ChildLoadBalancerHandler } from './load-balancer-child-handler'; export { diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 2817201e..10d865ce 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -193,6 +193,15 @@ export class InternalChannel { private readonly callTracker = new ChannelzCallTracker(); private readonly childrenTracker = new ChannelzChildrenTracker(); + /** + * Randomly generated ID to be passed to the config selector, for use by + * ring_hash in xDS. An integer distributed approximately uniformly between + * 0 and MAX_SAFE_INTEGER. + */ + private readonly randomChannelId = Math.floor( + Math.random() * Number.MAX_SAFE_INTEGER + ); + constructor( target: string, private readonly credentials: ChannelCredentials, @@ -528,7 +537,7 @@ export class InternalChannel { if (this.configSelector) { return { type: 'SUCCESS', - config: this.configSelector(method, metadata), + config: this.configSelector(method, metadata, this.randomChannelId), }; } else { if (this.currentResolutionError) { diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 8e051361..8f2097f4 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -33,10 +33,9 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { PickArgs, Picker, PickResult, PickResultType } from './picker'; import { Endpoint, + EndpointMap, SubchannelAddress, - endpointHasAddress, endpointToString, - subchannelAddressEqual, } from './subchannel-address'; import { BaseSubchannelWrapper, @@ -461,126 +460,9 @@ interface MapEntry { subchannelWrappers: OutlierDetectionSubchannelWrapper[]; } -interface EndpointMapEntry { - key: Endpoint; - value: MapEntry; -} - -function endpointEqualUnordered( - endpoint1: Endpoint, - endpoint2: Endpoint -): boolean { - if (endpoint1.addresses.length !== endpoint2.addresses.length) { - return false; - } - for (const address1 of endpoint1.addresses) { - let matchFound = false; - for (const address2 of endpoint2.addresses) { - if (subchannelAddressEqual(address1, address2)) { - matchFound = true; - break; - } - } - if (!matchFound) { - return false; - } - } - return true; -} - -class EndpointMap { - private map: Set = new Set(); - - get size() { - return this.map.size; - } - - getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined { - for (const entry of this.map) { - if (endpointHasAddress(entry.key, address)) { - return entry.value; - } - } - return undefined; - } - - /** - * Delete any entries in this map with keys that are not in endpoints - * @param endpoints - */ - deleteMissing(endpoints: Endpoint[]) { - for (const entry of this.map) { - let foundEntry = false; - for (const endpoint of endpoints) { - if (endpointEqualUnordered(endpoint, entry.key)) { - foundEntry = true; - } - } - if (!foundEntry) { - this.map.delete(entry); - } - } - } - - get(endpoint: Endpoint): MapEntry | undefined { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - return entry.value; - } - } - return undefined; - } - - set(endpoint: Endpoint, mapEntry: MapEntry) { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - entry.value = mapEntry; - return; - } - } - this.map.add({ key: endpoint, value: mapEntry }); - } - - delete(endpoint: Endpoint) { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - this.map.delete(entry); - return; - } - } - } - - has(endpoint: Endpoint): boolean { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - return true; - } - } - return false; - } - - *keys(): IterableIterator { - for (const entry of this.map) { - yield entry.key; - } - } - - *values(): IterableIterator { - for (const entry of this.map) { - yield entry.value; - } - } - - *entries(): IterableIterator<[Endpoint, MapEntry]> { - for (const entry of this.map) { - yield [entry.key, entry.value]; - } - } -} - export class OutlierDetectionLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private entryMap = new EndpointMap(); + private entryMap = new EndpointMap(); private latestConfig: OutlierDetectionLoadBalancingConfig | null = null; private ejectionTimer: NodeJS.Timeout; private timerStartTime: Date | null = null; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 9af66266..5cbd11cf 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -541,6 +541,19 @@ export class LeafLoadBalancer { this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG); } + /** + * Update the endpoint associated with this LeafLoadBalancer to a new + * endpoint. Does not trigger connection establishment if a connection + * attempt is not already in progress. + * @param newEndpoint + */ + updateEndpoint(newEndpoint: Endpoint) { + this.endpoint = newEndpoint; + if (this.latestState !== ConnectivityState.IDLE) { + this.startConnecting(); + } + } + getConnectivityState() { return this.latestState; } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 6474269f..e0526f7a 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -17,9 +17,10 @@ import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; -import { Status } from './constants'; +import { LogVerbosity, Status } from './constants'; import { LoadBalancer } from './load-balancer'; import { SubchannelInterface } from './subchannel-interface'; +import { trace } from './logging'; export enum PickResultType { COMPLETE, @@ -122,25 +123,40 @@ export class UnavailablePicker implements Picker { * indicating that the pick should be tried again with the next `Picker`. Also * reports back to the load balancer that a connection should be established * once any pick is attempted. + * If the childPicker is provided, delegate to it instead of returning the + * hardcoded QUEUE pick result, but still calls exitIdle. */ export class QueuePicker { private calledExitIdle = false; // Constructed with a load balancer. Calls exitIdle on it the first time pick is called - constructor(private loadBalancer: LoadBalancer) {} + constructor( + private loadBalancer: LoadBalancer, + private childPicker?: Picker + ) {} - pick(pickArgs: PickArgs): QueuePickResult { + pick(pickArgs: PickArgs): PickResult { + trace( + LogVerbosity.DEBUG, + 'picker', + 'Queue picker called for load balancer of type ' + + this.loadBalancer.constructor.name + ); if (!this.calledExitIdle) { process.nextTick(() => { this.loadBalancer.exitIdle(); }); this.calledExitIdle = true; } - return { - pickResultType: PickResultType.QUEUE, - subchannel: null, - status: null, - onCallStarted: null, - onCallEnded: null, - }; + if (this.childPicker) { + return this.childPicker.pick(pickArgs); + } else { + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } } } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 4dfa8d13..1c84c049 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -37,7 +37,7 @@ export interface CallConfig { * https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc */ export interface ConfigSelector { - (methodName: string, metadata: Metadata): CallConfig; + (methodName: string, metadata: Metadata, channelId: number): CallConfig; } /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index df480400..e600047e 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -279,7 +279,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { ); // Ensure that this.exitIdle() is called by the picker if (connectivityState === ConnectivityState.IDLE) { - picker = new QueuePicker(this); + picker = new QueuePicker(this, picker); } this.currentState = connectivityState; this.channelControlHelper.updateState(connectivityState, picker); diff --git a/packages/grpc-js/src/subchannel-address.ts b/packages/grpc-js/src/subchannel-address.ts index 36fd99ea..70a7962f 100644 --- a/packages/grpc-js/src/subchannel-address.ts +++ b/packages/grpc-js/src/subchannel-address.ts @@ -122,3 +122,127 @@ export function endpointHasAddress( } return false; } + +interface EndpointMapEntry { + key: Endpoint; + value: ValueType; +} + +function endpointEqualUnordered( + endpoint1: Endpoint, + endpoint2: Endpoint +): boolean { + if (endpoint1.addresses.length !== endpoint2.addresses.length) { + return false; + } + for (const address1 of endpoint1.addresses) { + let matchFound = false; + for (const address2 of endpoint2.addresses) { + if (subchannelAddressEqual(address1, address2)) { + matchFound = true; + break; + } + } + if (!matchFound) { + return false; + } + } + return true; +} + +export class EndpointMap { + private map: Set> = new Set(); + + get size() { + return this.map.size; + } + + getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined { + for (const entry of this.map) { + if (endpointHasAddress(entry.key, address)) { + return entry.value; + } + } + return undefined; + } + + /** + * Delete any entries in this map with keys that are not in endpoints + * @param endpoints + */ + deleteMissing(endpoints: Endpoint[]): ValueType[] { + const removedValues: ValueType[] = []; + for (const entry of this.map) { + let foundEntry = false; + for (const endpoint of endpoints) { + if (endpointEqualUnordered(endpoint, entry.key)) { + foundEntry = true; + } + } + if (!foundEntry) { + removedValues.push(entry.value); + this.map.delete(entry); + } + } + return removedValues; + } + + get(endpoint: Endpoint): ValueType | undefined { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return entry.value; + } + } + return undefined; + } + + set(endpoint: Endpoint, mapEntry: ValueType) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + entry.value = mapEntry; + return; + } + } + this.map.add({ key: endpoint, value: mapEntry }); + } + + delete(endpoint: Endpoint) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + this.map.delete(entry); + return; + } + } + } + + has(endpoint: Endpoint): boolean { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return true; + } + } + return false; + } + + clear() { + this.map.clear(); + } + + *keys(): IterableIterator { + for (const entry of this.map) { + yield entry.key; + } + } + + *values(): IterableIterator { + for (const entry of this.map) { + yield entry.value; + } + } + + *entries(): IterableIterator<[Endpoint, ValueType]> { + for (const entry of this.map) { + yield [entry.key, entry.value]; + } + } +}