grpc-js-xds: Implement ring_hash LB policy

This commit is contained in:
Michael Lumish 2023-09-07 17:14:39 -07:00
parent 3096f22ba6
commit 3a43cba3a3
30 changed files with 1081 additions and 204 deletions

View File

@ -63,6 +63,7 @@ const compile = checkTask(() => execNpmCommand('compile'));
const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG = 'true';
process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH = 'true';
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));

View File

@ -20,3 +20,4 @@ export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENA
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';
export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true';
export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'false') === 'true';

View File

@ -116,7 +116,7 @@ export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean
try {
typeUrl = getTopLevelFilterUrl(encodedConfig);
} catch (e) {
trace(httpFilter.name + ' validation failed with error ' + e.message);
trace(httpFilter.name + ' validation failed with error ' + (e as Error).message);
return false;
}
const registryEntry = FILTER_REGISTRY.get(typeUrl);
@ -243,4 +243,4 @@ export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: Http
} else {
return null;
}
}
}

View File

@ -23,6 +23,7 @@ import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
import * as xds_wrr_locality from './load-balancer-xds-wrr-locality';
import * as ring_hash from './load-balancer-ring-hash';
import * as router_filter from './http-filter/router-filter';
import * as fault_injection_filter from './http-filter/fault-injection-filter';
import * as csds from './csds';
@ -41,6 +42,7 @@ export function register() {
load_balancer_weighted_target.setup();
load_balancer_xds_cluster_manager.setup();
xds_wrr_locality.setup();
ring_hash.setup();
router_filter.setup();
fault_injection_filter.setup();
csds.setup();

View File

@ -41,9 +41,26 @@ const DEFAULT_FAILOVER_TIME_MS = 10_000;
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
export interface LocalityEndpoint extends Endpoint {
/**
* A sequence of strings that determines how to divide endpoints up in priority and
* weighted_target.
*/
localityPath: string[];
/**
* The locality this endpoint is in. Used in wrr_locality and xds_cluster_impl.
*/
locality: Locality__Output;
weight: number;
/**
* The load balancing weight for the entire locality that contains this
* endpoint. Used in xds_wrr_locality.
*/
localityWeight: number;
/**
* The overall load balancing weight for this endpoint, calculated as the
* product of the load balancing weight for this endpoint within its locality
* and the load balancing weight of the locality. Used in ring_hash.
*/
endpointWeight: number;
};
export function isLocalityEndpoint(
@ -317,7 +334,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* so that when the picker calls exitIdle, that in turn calls exitIdle on
* the PriorityChildImpl, which will start the failover timer. */
if (state === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
picker = new QueuePicker(this, picker);
}
this.channelControlHelper.updateState(state, picker);
}

View File

@ -0,0 +1,507 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { experimental, logVerbosity, connectivityState, status, Metadata, ChannelOptions, LoadBalancingConfig } from '@grpc/grpc-js';
import { isLocalityEndpoint } from './load-balancer-priority';
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LeafLoadBalancer = experimental.LeafLoadBalancer;
import Endpoint = experimental.Endpoint;
import Picker = experimental.Picker;
import PickArgs = experimental.PickArgs;
import PickResult = experimental.PickResult;
import PickResultType = experimental.PickResultType;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import createChildChannelControlHelper = experimental.createChildChannelControlHelper;
import UnavailablePicker = experimental.UnavailablePicker;
import subchannelAddressToString = experimental.subchannelAddressToString;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import EndpointMap = experimental.EndpointMap;
import { loadXxhashApi, xxhashApi } from './xxhash';
import { EXPERIMENTAL_RING_HASH } from './environment';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { RingHash__Output } from './generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash';
import { Any__Output } from './generated/google/protobuf/Any';
import { TypedExtensionConfig__Output } from './generated/envoy/config/core/v3/TypedExtensionConfig';
import { LoadBalancingPolicy__Output } from './generated/envoy/config/cluster/v3/LoadBalancingPolicy';
import { registerLbPolicy } from './lb-policy-registry';
const TRACER_NAME = 'ring_hash';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'ring_hash';
const DEFAULT_MIN_RING_SIZE = 1024;
const DEFAULT_MAX_RING_SIZE = 4096;
const ABSOLUTE_MAX_RING_SIZE = 8_388_608;
const DEFAULT_RING_SIZE_CAP = 4096;
class RingHashLoadBalancingConfig implements TypedLoadBalancingConfig {
private minRingSize: number;
private maxRingSize: number;
constructor(minRingSize?: number, maxRingSize?: number) {
this.minRingSize = Math.min(
minRingSize ?? DEFAULT_MIN_RING_SIZE,
ABSOLUTE_MAX_RING_SIZE
);
this.maxRingSize = Math.min(
maxRingSize ?? DEFAULT_MAX_RING_SIZE,
ABSOLUTE_MAX_RING_SIZE
);
}
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
return {
[TYPE_NAME]: {
min_ring_size: this.minRingSize,
max_ring_size: this.maxRingSize,
}
};
}
getMinRingSize() {
return this.minRingSize;
}
getMaxRingSize() {
return this.maxRingSize;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
static createFromJson(obj: any): TypedLoadBalancingConfig {
if ('min_ring_size' in obj) {
if (typeof obj.min_ring_size === 'number') {
if (obj.min_ring_size > ABSOLUTE_MAX_RING_SIZE) {
throw new Error(`ring_hash config field min_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.min_ring_size}`);
}
} else {
throw new Error(
'ring_hash config field min_ring_size must be a number if provided'
);
}
}
if ('max_ring_size' in obj) {
if (typeof obj.max_ring_size === 'number') {
if (obj.max_ring_size > ABSOLUTE_MAX_RING_SIZE) {
throw new Error(`ring_hash config field max_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.max_ring_size}`);
}
} else {
throw new Error(
'ring_hash config field max_ring_size must be a number if provided'
);
}
}
return new RingHashLoadBalancingConfig(
obj.min_ring_size,
obj.max_ring_size
);
}
}
interface RingEntry {
leafBalancer: LeafLoadBalancer;
hash: bigint;
}
interface EndpointWeight {
endpoint: Endpoint;
weight: number;
normalizedWeight: number;
}
class RingHashPicker implements Picker {
constructor(private ring: RingEntry[]) {}
/**
* Find the least index in the ring with a hash greater than or equal to the
* hash parameter, or 0 if no such index exists.
* @param hash
*/
private findIndexForHash(hash: bigint): number {
// Binary search to find the target index
let low = 0;
let high = this.ring.length;
let index = 0;
while (low <= high) {
/* Commonly in binary search, this operation can overflow and result in
* the wrong value. However, in this case the ring size is absolutely
* limtied to 1<<23, so low+high < MAX_SAFE_INTEGER */
index = Math.floor((low + high) / 2);
if (index === this.ring.length) {
index = 0;
break;
}
const midval = this.ring[index].hash;
const midval1 = index === 0 ? 0n : this.ring[index - 1].hash;
if (hash <= midval && hash > midval1) {
break;
}
if (midval < hash) {
low = index + 1;
} else {
high = index - 1;
}
if (low > high) {
index = 0;
break;
}
}
return index;
}
pick(pickArgs: PickArgs): PickResult {
trace('Pick called. Hash=' + pickArgs.extraPickInfo.hash);
const firstIndex = this.findIndexForHash(
BigInt(pickArgs.extraPickInfo.hash)
);
for (let i = 0; i < this.ring.length; i++) {
const index = (firstIndex + i) % this.ring.length;
const entryState = this.ring[index].leafBalancer.getConnectivityState();
if (entryState === connectivityState.READY) {
return this.ring[index].leafBalancer.getPicker().pick(pickArgs);
}
if (entryState === connectivityState.IDLE) {
this.ring[index].leafBalancer.startConnecting();
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
if (entryState === connectivityState.CONNECTING) {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
status: {
code: status.UNAVAILABLE,
details:
'ring_hash: invalid state: all child balancers in TRANSIENT_FAILURE',
metadata: new Metadata(),
},
subchannel: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
class RingHashLoadBalancer implements LoadBalancer {
/**
* Tracks endpoint repetition across address updates, to use an appropriate
* existing leaf load balancer for the same endpoint when possible.
*/
private leafMap = new EndpointMap<LeafLoadBalancer>();
/**
* Tracks endpoints from a single address update, with their associated
* weights aggregated from all weights associated with that endpoint in that
* update.
*/
private leafWeightMap = new EndpointMap<number>();
private childChannelControlHelper: ChannelControlHelper;
private updatesPaused = false;
private currentState: connectivityState = connectivityState.IDLE;
private ring: RingEntry[] = [];
private ringHashSizeCap = DEFAULT_RING_SIZE_CAP;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
updateState: (state, picker) => {
this.calculateAndUpdateState();
/* If this LB policy is in the TRANSIENT_FAILURE state, requests will
* not trigger new connections, so we need to explicitly try connecting
* to other endpoints that are currently IDLE to try to eventually
* connect to something. */
if (
state === connectivityState.TRANSIENT_FAILURE &&
this.currentState === connectivityState.TRANSIENT_FAILURE
) {
for (const leaf of this.leafMap.values()) {
const leafState = leaf.getConnectivityState();
if (leafState === connectivityState.CONNECTING) {
break;
}
if (leafState === connectivityState.IDLE) {
leaf.startConnecting();
break;
}
}
}
},
}
);
if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) {
this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'];
}
}
private calculateAndUpdateState() {
if (this.updatesPaused) {
return;
}
const stateCounts = {
[connectivityState.READY]: 0,
[connectivityState.TRANSIENT_FAILURE]: 0,
[connectivityState.CONNECTING]: 0,
[connectivityState.IDLE]: 0,
[connectivityState.SHUTDOWN]: 0,
};
for (const leaf of this.leafMap.values()) {
stateCounts[leaf.getConnectivityState()] += 1;
}
if (stateCounts[connectivityState.READY] > 0) {
this.updateState(connectivityState.READY, new RingHashPicker(this.ring));
// REPORT READY
} else if (stateCounts[connectivityState.TRANSIENT_FAILURE] > 1) {
this.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
} else if (stateCounts[connectivityState.CONNECTING] > 0) {
this.updateState(
connectivityState.CONNECTING,
new RingHashPicker(this.ring)
);
} else if (
stateCounts[connectivityState.TRANSIENT_FAILURE] > 0 &&
this.leafMap.size > 1
) {
this.updateState(
connectivityState.CONNECTING,
new RingHashPicker(this.ring)
);
} else if (stateCounts[connectivityState.IDLE] > 0) {
this.updateState(connectivityState.IDLE, new RingHashPicker(this.ring));
} else {
this.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
}
}
private updateState(newState: connectivityState, picker: Picker) {
trace(
connectivityState[this.currentState] +
' -> ' +
connectivityState[newState]
);
this.currentState = newState;
this.channelControlHelper.updateState(newState, picker);
}
private constructRing(
endpointList: Endpoint[],
config: RingHashLoadBalancingConfig
) {
this.ring = [];
const endpointWeights: EndpointWeight[] = [];
let weightSum = 0;
for (const endpoint of endpointList) {
const weight = this.leafWeightMap.get(endpoint) ?? 1;
endpointWeights.push({ endpoint, weight, normalizedWeight: 0 });
weightSum += weight;
}
/* The normalized weights sum to 1, with some small potential error due to
* the limitation of floating point precision. */
let minNormalizedWeight = 1;
for (const endpointWeight of endpointWeights) {
endpointWeight.normalizedWeight = endpointWeight.weight / weightSum;
minNormalizedWeight = Math.min(
endpointWeight.normalizedWeight,
minNormalizedWeight
);
}
const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap);
/* Calculate a scale factor that meets the following conditions:
* 1. The result is between minRingSize and maxRingSize, inclusive
* 2. The smallest normalized weight is scaled to a whole number, if it
* does not violate the previous condition.
* The size of the ring is ceil(scale)
*/
const scale = Math.min(
Math.ceil(minNormalizedWeight * minRingSize) / minNormalizedWeight,
maxRingSize
);
trace('Creating a ring with size ' + Math.ceil(scale));
/* For each endpoint, create a number of entries proportional to its
* weight, such that the total number of entries is equal to ceil(scale).
*/
let currentHashes = 0;
let targetHashes = 0;
for (const endpointWeight of endpointWeights) {
const addressString = subchannelAddressToString(
endpointWeight.endpoint.addresses[0]
);
targetHashes += scale * endpointWeight.normalizedWeight;
const leafBalancer = this.leafMap.get(endpointWeight.endpoint);
if (!leafBalancer) {
throw new Error(
'ring_hash: Invalid state: endpoint found in leafWeightMap but not in leafMap'
);
}
let count = 0;
while (currentHashes < targetHashes) {
const hashKey = `${addressString}_${count}`;
const hash = xxhashApi!.h64(hashKey, 0n);
this.ring.push({ hash, leafBalancer });
currentHashes++;
count++;
}
}
/* The ring is sorted by the hash so that it can be efficiently searched
* for a hash that is closest to any arbitrary hash. */
this.ring.sort((a, b) => {
if (a.hash > b.hash) {
return 1;
} else if (a.hash < b.hash) {
return -1;
} else {
return 0;
}
});
}
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
this.updatesPaused = true;
this.leafWeightMap.clear();
const dedupedEndpointList: Endpoint[] = [];
for (const endpoint of endpointList) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options)
);
}
const weight = this.leafWeightMap.get(endpoint);
if (weight === undefined) {
dedupedEndpointList.push(endpoint);
}
this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1));
}
const removedLeaves = this.leafMap.deleteMissing(endpointList);
for (const leaf of removedLeaves) {
leaf.destroy();
}
loadXxhashApi().then(() => {
this.constructRing(dedupedEndpointList, lbConfig);
this.updatesPaused = false;
this.calculateAndUpdateState();
});
}
exitIdle(): void {
/* This operation does not make sense here. We don't want to make the whole
* balancer exit idle, and instead propagate that to individual chlidren as
* relevant. */
}
resetBackoff(): void {
// There is no backoff to reset here
}
destroy(): void {
this.ring = [];
for (const child of this.leafMap.values()) {
child.destroy();
}
this.leafMap.clear();
this.leafWeightMap.clear();
}
getTypeName(): string {
return TYPE_NAME;
}
}
const RING_HASH_TYPE_URL = 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash';
const resourceRoot = loadProtosWithOptionsSync([
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto'], {
keepCase: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/xds/',
__dirname + '/../../deps/protoc-gen-validate'
],
}
);
const toObjectOptions = {
longs: String,
enums: String,
defaults: true,
oneofs: true
}
function decodeRingHash(message: Any__Output): RingHash__Output {
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
const type = resourceRoot.lookup(name);
if (type) {
const decodedMessage = (type as any).decode(message.value);
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as RingHash__Output;
} else {
throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`);
}
}
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig {
if (protoPolicy.typed_config?.type_url !== RING_HASH_TYPE_URL) {
throw new Error(`Ring Hash LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
}
const ringHashMessage = decodeRingHash(protoPolicy.typed_config);
if (ringHashMessage.hash_function !== 'XX_HASH') {
throw new Error(`Ring Hash LB policy parsing error: unexpected hash function ${ringHashMessage.hash_function}`);
}
return {
[TYPE_NAME]: {
min_ring_size: ringHashMessage.minimum_ring_size?.value ?? 1024,
max_ring_size: ringHashMessage.maximum_ring_size?.value ?? 8_388_608
}
};
}
export function setup() {
if (EXPERIMENTAL_RING_HASH) {
registerLoadBalancerType(
TYPE_NAME,
RingHashLoadBalancer,
RingHashLoadBalancingConfig
);
registerLbPolicy(RING_HASH_TYPE_URL, convertToLoadBalancingPolicy);
}
}

View File

@ -204,35 +204,7 @@ class XdsClusterManager implements LoadBalancer {
} else {
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
}
/* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is
* exactly one corresponding picker, so if the state is one of those and
* that does not change, no new information is provided by passing the
* new state upward. */
if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) {
return;
}
let picker: Picker;
switch (connectivityState) {
case ConnectivityState.READY:
picker = new XdsClusterManagerPicker(pickerMap);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.IDLE:
picker = new QueuePicker(this);
break;
default:
picker = new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE',
metadata: new Metadata()
});
}
trace(
'Transitioning to ' +
ConnectivityState[connectivityState]
);
this.channelControlHelper.updateState(connectivityState, picker);
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {

View File

@ -115,10 +115,15 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig
}
}
interface WeightedEndpoint {
endpoint: Endpoint;
weight: number;
}
interface LocalityEntry {
locality: Locality__Output;
weight: number;
endpoints: Endpoint[];
endpoints: WeightedEndpoint[];
}
interface PriorityEntry {
@ -166,16 +171,19 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt
if (!endpoint.load_balancing_weight) {
continue;
}
const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
(lbEndpoint) => {
/* The validator in the XdsClient class ensures that each endpoint has
* a socket_address with an IP address and a port_value. */
const socketAddress = lbEndpoint.endpoint!.address!.socket_address!;
return {
addresses: [{
host: socketAddress.address!,
port: socketAddress.port_value!,
}]
endpoint: {
addresses: [{
host: socketAddress.address!,
port: socketAddress.port_value!,
}]
},
weight: lbEndpoint.load_balancing_weight?.value ?? 1
};
}
);
@ -211,7 +219,7 @@ function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] {
sub_zone: ''
},
weight: 1,
endpoints: endpoints
endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1}))
}],
dropCategories: []
}];
@ -295,15 +303,16 @@ export class XdsClusterResolver implements LoadBalancer {
newPriorityNames[priority] = newPriorityName;
for (const localityObj of priorityEntry.localities) {
for (const endpoint of localityObj.endpoints) {
for (const weightedEndpoint of localityObj.endpoints) {
endpointList.push({
localityPath: [
newPriorityName,
localityToName(localityObj.locality),
],
locality: localityObj.locality,
weight: localityObj.weight,
...endpoint
localityWeight: localityObj.weight,
endpointWeight: localityObj.weight * weightedEndpoint.weight,
...weightedEndpoint.endpoint
});
}
newLocalityPriorities.set(localityToName(localityObj.locality), priority);

View File

@ -90,7 +90,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
if (!(localityName in targets)) {
targets[localityName] = {
child_policy: lbConfig.getChildPolicy(),
weight: address.weight
weight: address.localityWeight
};
}
}

View File

@ -71,7 +71,7 @@ export class SafeRegexValueMatcher implements ValueMatcher {
const numberRegex = new RE2(/^-?\d+$/u);
export class RangeValueMatcher implements ValueMatcher {
constructor(private start: BigInt, private end: BigInt) {}
constructor(private start: bigint, private end: bigint) {}
apply(value: string) {
if (!numberRegex.test(value)) {
@ -264,4 +264,4 @@ export class FullMatcher implements Matcher {
headers: ${this.headerMatchers.map(matcher => matcher.toString()).join('\n\t')}
fraction: ${this.fraction ? fractionToString(this.fraction): 'none'}`;
}
}
}

View File

@ -34,18 +34,19 @@ import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderM
import ConfigSelector = experimental.ConfigSelector;
import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { envoyFractionToFraction, Fraction } from "./fraction";
import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources';
import Duration = experimental.Duration;
import { Duration__Output } from './generated/google/protobuf/Duration';
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY } from './environment';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY, EXPERIMENTAL_RING_HASH } from './environment';
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import { BootstrapInfo, loadBootstrapInfo, validateBootstrapConfig } from './xds-bootstrap';
import { ListenerResourceType } from './xds-resource-type/listener-resource-type';
import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type';
import { protoDurationToDuration } from './duration';
import { loadXxhashApi } from './xxhash';
const TRACER_NAME = 'xds_resolver';
@ -381,7 +382,11 @@ class XdsResolver implements Resolver {
}
}
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
private async handleRouteConfig(routeConfig: RouteConfiguration__Output) {
/* We need to load the xxhash API before this function finishes, because
* it is invoked in the config selector, which can be called immediately
* after this function returns. */
await loadXxhashApi();
this.latestRouteConfig = routeConfig;
/* Select the virtual host using the default authority override if it
* exists, and the channel target otherwise. */
@ -456,6 +461,26 @@ class XdsResolver implements Resolver {
}
}
}
const hashPolicies: HashPolicy[] = [];
if (EXPERIMENTAL_RING_HASH) {
for (const routeHashPolicy of route.route!.hash_policy) {
if (routeHashPolicy.policy_specifier === 'header') {
const headerPolicy = routeHashPolicy.header!;
hashPolicies.push({
type: 'HEADER',
terminal: routeHashPolicy.terminal,
headerName: headerPolicy.header_name,
regex: headerPolicy.regex_rewrite?.pattern ? new RE2(headerPolicy.regex_rewrite.pattern.regex, 'ug') : undefined,
regexSubstitution: headerPolicy.regex_rewrite?.substitution
});
} else if (routeHashPolicy.policy_specifier === 'filter_state' && routeHashPolicy.filter_state!.key === 'io.grpc.channel_id') {
hashPolicies.push({
type: 'CHANNEL_ID',
terminal: routeHashPolicy.terminal
});
}
}
}
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
@ -483,7 +508,7 @@ class XdsResolver implements Resolver {
}
}
}
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories);
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories, hashPolicies);
break;
}
case 'weighted_clusters': {
@ -525,7 +550,7 @@ class XdsResolver implements Resolver {
}
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
}
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy});
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}, hashPolicies);
break;
}
default:
@ -554,7 +579,7 @@ class XdsResolver implements Resolver {
this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0});
}
}
const configSelector: ConfigSelector = (methodName, metadata) => {
const configSelector: ConfigSelector = (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
@ -562,10 +587,11 @@ class XdsResolver implements Resolver {
const onCommitted = () => {
this.unrefCluster(clusterResult.name);
}
const hash = action.getHash(metadata, channelId);
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name},
pickInformation: {cluster: clusterResult.name, hash: `${hash}`},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
@ -573,8 +599,8 @@ class XdsResolver implements Resolver {
}
return {
methodConfig: {name: []},
// cluster won't be used here, but it's set because of some TypeScript weirdness
pickInformation: {cluster: ''},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};

View File

@ -14,10 +14,12 @@
* limitations under the License.
*/
import { MethodConfig, experimental } from '@grpc/grpc-js';
import { Metadata, MethodConfig, experimental } from '@grpc/grpc-js';
import Duration = experimental.Duration;
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import { RE2 } from 're2-wasm';
import { xxhashApi } from './xxhash';
export interface ClusterResult {
name: string;
@ -28,6 +30,7 @@ export interface ClusterResult {
export interface RouteAction {
toString(): string;
getCluster(): ClusterResult;
getHash(metadata: Metadata, channelId: number): bigint;
}
function durationToLogString(duration: Duration) {
@ -39,8 +42,83 @@ function durationToLogString(duration: Duration) {
}
}
export interface HashPolicy {
type: 'HEADER' | 'CHANNEL_ID';
terminal: boolean;
headerName?: string;
regex?: RE2;
regexSubstitution?: string;
}
/**
* Must be called only after xxhash.loadXxhashApi() resolves.
* @param hashPolicies
* @param metadata
* @param channelId
*/
function getHash(hashPolicies: HashPolicy[], metadata: Metadata, channelId: number): bigint {
let hash: bigint | null = null;
for (const policy of hashPolicies) {
let newHash: bigint | null = null;
switch (policy.type) {
case 'CHANNEL_ID':
newHash = xxhashApi!.h64(`${channelId}`, 0n);
break;
case 'HEADER': {
if (!policy.headerName) {
break;
}
if (policy.headerName.endsWith('-bin')) {
break;
}
let headerString: string;
if (policy.headerName === 'content-type') {
headerString = 'application/grpc';
} else {
const headerValues = metadata.get(policy.headerName);
if (headerValues.length === 0) {
break;
}
headerString = headerValues.join(',');
}
let rewrittenHeaderString = headerString;
if (policy.regex && policy.regexSubstitution) {
/* The JS string replace method uses $-prefixed patterns to produce
* other strings. See
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace#specifying_a_string_as_the_replacement
* RE2-based regex substitutions use \n where n is a number to refer
* to capture group n, and they otherwise have no special replacement
* patterns. See
* https://github.com/envoyproxy/envoy/blob/2443032526cf6e50d63d35770df9473dd0460fc0/api/envoy/type/matcher/v3/regex.proto#L79-L87
* We convert an RE2 regex substitution into a string substitution by
* first replacing each "$" with "$$" (which produces "$" in the
* output), and then replace each "\n" for any whole number n with
* "$n". */
const regexSubstitution = policy.regexSubstitution.replace(/\$/g, '$$$$').replace(/\\(\d+)/g, '$$$1');
rewrittenHeaderString = headerString.replace(policy.regex, regexSubstitution);
}
newHash = xxhashApi!.h64(rewrittenHeaderString, 0n);
break;
}
}
if (hash === null) {
hash = newHash;
} else if (newHash !== null) {
hash = ((hash << 1n) | (hash >> 63n)) ^ newHash;
}
if (policy.terminal && hash !== null) {
break;
}
}
if (hash === null) {
return xxhashApi!.h64(`${Math.random()}`, 0n);
} else {
return hash;
}
}
export class SingleClusterRouteAction implements RouteAction {
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[]) {}
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[], private hashPolicies: HashPolicy[]) {}
getCluster() {
return {
@ -50,6 +128,10 @@ export class SingleClusterRouteAction implements RouteAction {
};
}
getHash(metadata: Metadata, channelId: number): bigint {
return getHash(this.hashPolicies, metadata, channelId);
}
toString() {
return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')';
}
@ -72,7 +154,7 @@ export class WeightedClusterRouteAction implements RouteAction {
* The weighted cluster choices represented as a CDF
*/
private clusterChoices: ClusterChoice[];
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) {
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig, private hashPolicies: HashPolicy[]) {
this.clusterChoices = [];
let lastNumerator = 0;
for (const clusterWeight of clusters) {
@ -96,6 +178,10 @@ export class WeightedClusterRouteAction implements RouteAction {
return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []};
}
getHash(metadata: Metadata, channelId: number): bigint {
return getHash(this.hashPolicies, metadata, channelId);
}
toString() {
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')';

View File

@ -357,14 +357,14 @@ export function loadBootstrapInfo(): BootstrapInfo {
try {
rawBootstrap = fs.readFileSync(bootstrapPath, { encoding: 'utf8'});
} catch (e) {
throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${e.message}`);
throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${(e as Error).message}`);
}
try {
const parsedFile = JSON.parse(rawBootstrap);
loadedBootstrapInfo = validateBootstrapConfig(parsedFile);
return loadedBootstrapInfo;
} catch (e) {
throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`)
throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${(e as Error).message}`)
}
}
@ -383,7 +383,7 @@ export function loadBootstrapInfo(): BootstrapInfo {
loadedBootstrapInfo = validateBootstrapConfig(parsedConfig);
} catch (e) {
throw new Error(
`Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${e.message}`
`Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${(e as Error).message}`
);
}

View File

@ -208,14 +208,14 @@ class AdsResponseParser {
try {
decodeResult = this.result.type.decode(decodeContext, resource);
} catch (e) {
this.result.errors.push(`${errorPrefix} ${e.message}`);
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
let parsedName: XdsResourceName;
try {
parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl());
} catch (e) {
this.result.errors.push(`${errorPrefix} ${e.message}`);
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen();

View File

@ -21,7 +21,7 @@ import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
import { XdsServerConfig } from "../xds-bootstrap";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection";
import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION, EXPERIMENTAL_RING_HASH } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { Any__Output } from "../generated/google/protobuf/Any";
@ -150,6 +150,27 @@ export class ClusterResourceType extends XdsResourceType {
child_policy: [{round_robin: {}}]
}
};
} else if(EXPERIMENTAL_RING_HASH && message.lb_policy === 'RING_HASH') {
if (!message.ring_hash_lb_config) {
return null;
}
if (message.ring_hash_lb_config.hash_function !== 'XX_HASH') {
return null;
}
const minRingSize = message.ring_hash_lb_config.minimum_ring_size ? Number(message.ring_hash_lb_config.minimum_ring_size.value) : 1024;
if (minRingSize > 8_388_608) {
return null;
}
const maxRingSize = message.ring_hash_lb_config.maximum_ring_size ? Number(message.ring_hash_lb_config.maximum_ring_size.value) : 8_388_608;
if (maxRingSize > 8_388_608) {
return null;
}
lbPolicyConfig = {
ring_hash: {
min_ring_size: minRingSize,
max_ring_size: maxRingSize
}
};
} else {
return null;
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* The simpler `import xxhash from 'xxhash-wasm';` doesn't compile correctly
* to CommonJS require calls for some reason, so we use this import to get
* the type, and then an explicit require call to get the actual value. */
import xxhashImport from 'xxhash-wasm';
const xxhash: typeof xxhashImport = require('xxhash-wasm');
export let xxhashApi: Awaited<ReturnType<typeof xxhash>> | null = null;
export async function loadXxhashApi() {
if (!xxhashApi) {
xxhashApi = await xxhash();
}
return xxhashApi;
}

View File

@ -70,7 +70,7 @@ export interface FakeCluster {
}
export class FakeEdsCluster implements FakeCluster {
constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any) {}
constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any | 'RING_HASH') {}
getEndpointConfig(): ClusterLoadAssignment {
return {
@ -94,7 +94,12 @@ export class FakeEdsCluster implements FakeCluster {
]
}
};
if (this.loadBalancingPolicyOverride) {
if (this.loadBalancingPolicyOverride === 'RING_HASH') {
result.lb_policy = 'RING_HASH';
result.ring_hash_lb_config = {
hash_function: 'XX_HASH'
};
} else if (this.loadBalancingPolicyOverride) {
result.load_balancing_policy = {
policies: [
{
@ -257,8 +262,14 @@ function createRouteConfig(route: FakeRoute): Route {
prefix: ''
},
route: {
cluster: route.cluster.getName()
}
cluster: route.cluster.getName(),
// Default to consistent hash
hash_policy: [{
filter_state: {
key: 'io.grpc.channel_id'
}
}]
},
};
} else {
return {
@ -271,7 +282,13 @@ function createRouteConfig(route: FakeRoute): Route {
name: clusterWeight.cluster.getName(),
weight: {value: clusterWeight.weight}
}))
}
},
// Default to consistent hash
hash_policy: [{
filter_state: {
key: 'io.grpc.channel_id'
}
}]
}
}
}

View File

@ -311,6 +311,37 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
}
}
}
],
ring_hash: [
{
name: 'empty config',
input: {},
output: {
min_ring_size: 1024,
max_ring_size: 4096
}
},
{
name: 'populated config',
input: {
min_ring_size: 2048,
max_ring_size: 8192
}
},
{
name: 'min_ring_size too large',
input: {
min_ring_size: 8_388_609
},
error: /min_ring_size/
},
{
name: 'max_ring_size too large',
input: {
max_ring_size: 8_388_609
},
error: /max_ring_size/
}
]
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Backend } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
import { Any } from "../src/generated/google/protobuf/Any";
import { AnyExtension } from "@grpc/proto-loader";
import { RingHash } from "../src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash";
register();
describe('Ring hash LB policy', () => {
let xdsServer: XdsServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer.startServer(error => {
done(error);
});
});
afterEach(() => {
client?.close();
xdsServer?.shutdownServer();
});
it('Should route requests to the single backend with the old lbPolicy field', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
});
it('Should route requests to the single backend with the new load_balancing_policy field', done => {
const lbPolicy: AnyExtension & RingHash = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash',
hash_function: 'XX_HASH'
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
});
it('Should route all identical requests to the same backend', done => {
const backend1 = new Backend();
const backend2 = new Backend()
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendNCalls(10, error => {
assert.ifError(error);
assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0));
done();
})
}, reason => done(reason));
});
})

View File

@ -44,6 +44,7 @@ const loadedProtos = loadPackageDefinition(loadSync(
'envoy/extensions/clusters/aggregate/v3/cluster.proto',
'envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto',
'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto',
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto',
'xds/type/v3/typed_struct.proto'
],
{

View File

@ -3,8 +3,8 @@
"compilerOptions": {
"rootDir": ".",
"outDir": "build",
"target": "es2017",
"lib": ["es2017"],
"target": "es2020",
"lib": ["es2020"],
"module": "commonjs",
"incremental": true
},

View File

@ -61,6 +61,7 @@ export interface ChannelOptions {
* Set the enableTrace option in TLS clients and servers
*/
'grpc-node.tls_enable_trace'?: number;
'grpc.lb.ring_hash.ring_size_cap'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
@ -96,6 +97,7 @@ export const recognizedOptions = {
'grpc.service_config_disable_resolution': true,
'grpc.client_idle_timeout_ms': true,
'grpc-node.tls_enable_trace': true,
'grpc.lb.ring_hash.ring_size_cap': true,
};
export function channelOptionsEqual(

View File

@ -26,6 +26,7 @@ export {
Endpoint,
endpointToString,
endpointHasAddress,
EndpointMap,
} from './subchannel-address';
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';
export {

View File

@ -193,6 +193,15 @@ export class InternalChannel {
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();
/**
* Randomly generated ID to be passed to the config selector, for use by
* ring_hash in xDS. An integer distributed approximately uniformly between
* 0 and MAX_SAFE_INTEGER.
*/
private readonly randomChannelId = Math.floor(
Math.random() * Number.MAX_SAFE_INTEGER
);
constructor(
target: string,
private readonly credentials: ChannelCredentials,
@ -528,7 +537,7 @@ export class InternalChannel {
if (this.configSelector) {
return {
type: 'SUCCESS',
config: this.configSelector(method, metadata),
config: this.configSelector(method, metadata, this.randomChannelId),
};
} else {
if (this.currentResolutionError) {

View File

@ -33,10 +33,9 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { PickArgs, Picker, PickResult, PickResultType } from './picker';
import {
Endpoint,
EndpointMap,
SubchannelAddress,
endpointHasAddress,
endpointToString,
subchannelAddressEqual,
} from './subchannel-address';
import {
BaseSubchannelWrapper,
@ -461,126 +460,9 @@ interface MapEntry {
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
}
interface EndpointMapEntry {
key: Endpoint;
value: MapEntry;
}
function endpointEqualUnordered(
endpoint1: Endpoint,
endpoint2: Endpoint
): boolean {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
class EndpointMap {
private map: Set<EndpointMapEntry> = new Set();
get size() {
return this.map.size;
}
getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]) {
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
this.map.delete(entry);
}
}
}
get(endpoint: Endpoint): MapEntry | undefined {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint: Endpoint, mapEntry: MapEntry) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint: Endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint: Endpoint): boolean {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
*keys(): IterableIterator<Endpoint> {
for (const entry of this.map) {
yield entry.key;
}
}
*values(): IterableIterator<MapEntry> {
for (const entry of this.map) {
yield entry.value;
}
}
*entries(): IterableIterator<[Endpoint, MapEntry]> {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}
export class OutlierDetectionLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private entryMap = new EndpointMap();
private entryMap = new EndpointMap<MapEntry>();
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
private ejectionTimer: NodeJS.Timeout;
private timerStartTime: Date | null = null;

View File

@ -541,6 +541,19 @@ export class LeafLoadBalancer {
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
}
/**
* Update the endpoint associated with this LeafLoadBalancer to a new
* endpoint. Does not trigger connection establishment if a connection
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint: Endpoint) {
this.endpoint = newEndpoint;
if (this.latestState !== ConnectivityState.IDLE) {
this.startConnecting();
}
}
getConnectivityState() {
return this.latestState;
}

View File

@ -17,9 +17,10 @@
import { StatusObject } from './call-interface';
import { Metadata } from './metadata';
import { Status } from './constants';
import { LogVerbosity, Status } from './constants';
import { LoadBalancer } from './load-balancer';
import { SubchannelInterface } from './subchannel-interface';
import { trace } from './logging';
export enum PickResultType {
COMPLETE,
@ -122,25 +123,40 @@ export class UnavailablePicker implements Picker {
* indicating that the pick should be tried again with the next `Picker`. Also
* reports back to the load balancer that a connection should be established
* once any pick is attempted.
* If the childPicker is provided, delegate to it instead of returning the
* hardcoded QUEUE pick result, but still calls exitIdle.
*/
export class QueuePicker {
private calledExitIdle = false;
// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(private loadBalancer: LoadBalancer) {}
constructor(
private loadBalancer: LoadBalancer,
private childPicker?: Picker
) {}
pick(pickArgs: PickArgs): QueuePickResult {
pick(pickArgs: PickArgs): PickResult {
trace(
LogVerbosity.DEBUG,
'picker',
'Queue picker called for load balancer of type ' +
this.loadBalancer.constructor.name
);
if (!this.calledExitIdle) {
process.nextTick(() => {
this.loadBalancer.exitIdle();
});
this.calledExitIdle = true;
}
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
if (this.childPicker) {
return this.childPicker.pick(pickArgs);
} else {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
}

View File

@ -37,7 +37,7 @@ export interface CallConfig {
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
}
/**

View File

@ -279,7 +279,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
);
// Ensure that this.exitIdle() is called by the picker
if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
picker = new QueuePicker(this, picker);
}
this.currentState = connectivityState;
this.channelControlHelper.updateState(connectivityState, picker);

View File

@ -122,3 +122,127 @@ export function endpointHasAddress(
}
return false;
}
interface EndpointMapEntry<ValueType> {
key: Endpoint;
value: ValueType;
}
function endpointEqualUnordered(
endpoint1: Endpoint,
endpoint2: Endpoint
): boolean {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
export class EndpointMap<ValueType> {
private map: Set<EndpointMapEntry<ValueType>> = new Set();
get size() {
return this.map.size;
}
getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]): ValueType[] {
const removedValues: ValueType[] = [];
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
removedValues.push(entry.value);
this.map.delete(entry);
}
}
return removedValues;
}
get(endpoint: Endpoint): ValueType | undefined {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint: Endpoint, mapEntry: ValueType) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint: Endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint: Endpoint): boolean {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
clear() {
this.map.clear();
}
*keys(): IterableIterator<Endpoint> {
for (const entry of this.map) {
yield entry.key;
}
}
*values(): IterableIterator<ValueType> {
for (const entry of this.map) {
yield entry.value;
}
}
*entries(): IterableIterator<[Endpoint, ValueType]> {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}