diff --git a/packages/grpc-js-xds/gulpfile.ts b/packages/grpc-js-xds/gulpfile.ts index 6f17a402..93f93d8f 100644 --- a/packages/grpc-js-xds/gulpfile.ts +++ b/packages/grpc-js-xds/gulpfile.ts @@ -63,6 +63,9 @@ const compile = checkTask(() => execNpmCommand('compile')); const runTests = checkTask(() => { process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true'; process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG = 'true'; + if (Number(process.versions.node.split('.')[0]) > 14) { + process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH = 'true'; + } return gulp.src(`${outDir}/test/**/*.js`) .pipe(mocha({reporter: 'mocha-jenkins-reporter', require: ['ts-node/register']})); diff --git a/packages/grpc-js-xds/interop/Dockerfile b/packages/grpc-js-xds/interop/Dockerfile index 63a32106..6239b5f2 100644 --- a/packages/grpc-js-xds/interop/Dockerfile +++ b/packages/grpc-js-xds/interop/Dockerfile @@ -33,6 +33,6 @@ COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/ COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/ ENV GRPC_VERBOSITY="DEBUG" -ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call +ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call,ring_hash ENTRYPOINT [ "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ] diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index f26278ab..1fdaa3a6 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -467,9 +467,11 @@ function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpc makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker, callStartTimestampsTrackers[callType]); } }, 1000/qps); - setInterval(() => { - console.log(`Accumulated stats: ${JSON.stringify(accumulatedStats, undefined, 2)}`); - }, 1000); + if (VERBOSITY >= 2) { + setInterval(() => { + console.log(`Accumulated stats: ${JSON.stringify(accumulatedStats, undefined, 2)}`); + }, 1000); + } } const callTypeEnumMap = { diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index b43304ce..6ec4548d 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -12,7 +12,7 @@ "prepare": "npm run compile", "pretest": "npm run compile", "posttest": "npm run check", - "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto", + "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto", "generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto", "generate-test-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O test/generated --grpcLib @grpc/grpc-js grpc/testing/echo.proto" }, @@ -38,15 +38,16 @@ "@types/mocha": "^5.2.6", "@types/node": "^13.11.1", "@types/yargs": "^15.0.5", - "gts": "^2.0.2", - "typescript": "^3.8.3", + "gts": "^5.0.1", + "typescript": "^4.9.5", "yargs": "^15.4.1" }, "dependencies": { "@grpc/proto-loader": "^0.6.0", "google-auth-library": "^7.0.2", "re2-wasm": "^1.0.1", - "vscode-uri": "^3.0.7" + "vscode-uri": "^3.0.7", + "xxhash-wasm": "^1.0.2" }, "peerDependencies": { "@grpc/grpc-js": "~1.8.0" diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh index a1e24ff1..e4a0cf21 100755 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh @@ -166,6 +166,7 @@ main() { cd "${TEST_DRIVER_FULL_DIR}" local failed_tests=0 test_suites=( + "affinity_test" "api_listener_test" "baseline_test" "change_backend_service_test" diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts index 530bb256..85890311 100644 --- a/packages/grpc-js-xds/src/environment.ts +++ b/packages/grpc-js-xds/src/environment.ts @@ -20,3 +20,4 @@ export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENA export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true'; export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true'; export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true'; +export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'false') === 'true'; diff --git a/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/ConsistentHashingLbConfig.ts b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/ConsistentHashingLbConfig.ts new file mode 100644 index 00000000..c216720f --- /dev/null +++ b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/ConsistentHashingLbConfig.ts @@ -0,0 +1,67 @@ +// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto + +import type { UInt32Value as _google_protobuf_UInt32Value, UInt32Value__Output as _google_protobuf_UInt32Value__Output } from '../../../../../google/protobuf/UInt32Value'; + +/** + * Common Configuration for all consistent hashing load balancers (MaglevLb, RingHashLb, etc.) + */ +export interface ConsistentHashingLbConfig { + /** + * If set to ``true``, the cluster will use hostname instead of the resolved + * address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address. + */ + 'use_hostname_for_hashing'?: (boolean); + /** + * Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150 + * no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster. + * If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200. + * Minimum is 100. + * + * Applies to both Ring Hash and Maglev load balancers. + * + * This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified + * ``hash_balance_factor``, requests to any upstream host are capped at ``hash_balance_factor/100`` times the average number of requests + * across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing + * is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify + * the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the + * cascading overflow effect when choosing the next host in the ring/table). + * + * If weights are specified on the hosts, they are respected. + * + * This is an O(N) algorithm, unlike other load balancers. Using a lower ``hash_balance_factor`` results in more hosts + * being probed, so use a higher value if you require better performance. + */ + 'hash_balance_factor'?: (_google_protobuf_UInt32Value | null); +} + +/** + * Common Configuration for all consistent hashing load balancers (MaglevLb, RingHashLb, etc.) + */ +export interface ConsistentHashingLbConfig__Output { + /** + * If set to ``true``, the cluster will use hostname instead of the resolved + * address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address. + */ + 'use_hostname_for_hashing': (boolean); + /** + * Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150 + * no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster. + * If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200. + * Minimum is 100. + * + * Applies to both Ring Hash and Maglev load balancers. + * + * This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified + * ``hash_balance_factor``, requests to any upstream host are capped at ``hash_balance_factor/100`` times the average number of requests + * across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing + * is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify + * the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the + * cascading overflow effect when choosing the next host in the ring/table). + * + * If weights are specified on the hosts, they are respected. + * + * This is an O(N) algorithm, unlike other load balancers. Using a lower ``hash_balance_factor`` results in more hosts + * being probed, so use a higher value if you require better performance. + */ + 'hash_balance_factor': (_google_protobuf_UInt32Value__Output | null); +} diff --git a/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/LocalityLbConfig.ts b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/LocalityLbConfig.ts new file mode 100644 index 00000000..d6fecd36 --- /dev/null +++ b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/LocalityLbConfig.ts @@ -0,0 +1,101 @@ +// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto + +import type { Percent as _envoy_type_v3_Percent, Percent__Output as _envoy_type_v3_Percent__Output } from '../../../../../envoy/type/v3/Percent'; +import type { UInt64Value as _google_protobuf_UInt64Value, UInt64Value__Output as _google_protobuf_UInt64Value__Output } from '../../../../../google/protobuf/UInt64Value'; +import type { Long } from '@grpc/proto-loader'; + +/** + * Configuration for :ref:`locality weighted load balancing + * ` + */ +export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig { +} + +/** + * Configuration for :ref:`locality weighted load balancing + * ` + */ +export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output { +} + +/** + * Configuration for :ref:`zone aware routing + * `. + */ +export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig { + /** + * Configures percentage of requests that will be considered for zone aware routing + * if zone aware routing is configured. If not specified, the default is 100%. + * * :ref:`runtime values `. + * * :ref:`Zone aware routing support `. + */ + 'routing_enabled'?: (_envoy_type_v3_Percent | null); + /** + * Configures minimum upstream cluster size required for zone aware routing + * If upstream cluster size is less than specified, zone aware routing is not performed + * even if zone aware routing is configured. If not specified, the default is 6. + * * :ref:`runtime values `. + * * :ref:`Zone aware routing support `. + */ + 'min_cluster_size'?: (_google_protobuf_UInt64Value | null); + /** + * If set to true, Envoy will not consider any hosts when the cluster is in :ref:`panic + * mode`. Instead, the cluster will fail all + * requests as if all hosts are unhealthy. This can help avoid potentially overwhelming a + * failing service. + */ + 'fail_traffic_on_panic'?: (boolean); +} + +/** + * Configuration for :ref:`zone aware routing + * `. + */ +export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig__Output { + /** + * Configures percentage of requests that will be considered for zone aware routing + * if zone aware routing is configured. If not specified, the default is 100%. + * * :ref:`runtime values `. + * * :ref:`Zone aware routing support `. + */ + 'routing_enabled': (_envoy_type_v3_Percent__Output | null); + /** + * Configures minimum upstream cluster size required for zone aware routing + * If upstream cluster size is less than specified, zone aware routing is not performed + * even if zone aware routing is configured. If not specified, the default is 6. + * * :ref:`runtime values `. + * * :ref:`Zone aware routing support `. + */ + 'min_cluster_size': (_google_protobuf_UInt64Value__Output | null); + /** + * If set to true, Envoy will not consider any hosts when the cluster is in :ref:`panic + * mode`. Instead, the cluster will fail all + * requests as if all hosts are unhealthy. This can help avoid potentially overwhelming a + * failing service. + */ + 'fail_traffic_on_panic': (boolean); +} + +export interface LocalityLbConfig { + /** + * Configuration for local zone aware load balancing. + */ + 'zone_aware_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig | null); + /** + * Enable locality weighted load balancing. + */ + 'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig | null); + 'locality_config_specifier'?: "zone_aware_lb_config"|"locality_weighted_lb_config"; +} + +export interface LocalityLbConfig__Output { + /** + * Configuration for local zone aware load balancing. + */ + 'zone_aware_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig__Output | null); + /** + * Enable locality weighted load balancing. + */ + 'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output | null); + 'locality_config_specifier': "zone_aware_lb_config"|"locality_weighted_lb_config"; +} diff --git a/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/SlowStartConfig.ts b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/SlowStartConfig.ts new file mode 100644 index 00000000..bc222ff8 --- /dev/null +++ b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/common/v3/SlowStartConfig.ts @@ -0,0 +1,71 @@ +// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto + +import type { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from '../../../../../google/protobuf/Duration'; +import type { RuntimeDouble as _envoy_config_core_v3_RuntimeDouble, RuntimeDouble__Output as _envoy_config_core_v3_RuntimeDouble__Output } from '../../../../../envoy/config/core/v3/RuntimeDouble'; +import type { Percent as _envoy_type_v3_Percent, Percent__Output as _envoy_type_v3_Percent__Output } from '../../../../../envoy/type/v3/Percent'; + +/** + * Configuration for :ref:`slow start mode `. + */ +export interface SlowStartConfig { + /** + * Represents the size of slow start window. + * If set, the newly created host remains in slow start mode starting from its creation time + * for the duration of slow start window. + */ + 'slow_start_window'?: (_google_protobuf_Duration | null); + /** + * This parameter controls the speed of traffic increase over the slow start window. Defaults to 1.0, + * so that endpoint would get linearly increasing amount of traffic. + * When increasing the value for this parameter, the speed of traffic ramp-up increases non-linearly. + * The value of aggression parameter should be greater than 0.0. + * By tuning the parameter, is possible to achieve polynomial or exponential shape of ramp-up curve. + * + * During slow start window, effective weight of an endpoint would be scaled with time factor and aggression: + * ``new_weight = weight * max(min_weight_percent, time_factor ^ (1 / aggression))``, + * where ``time_factor=(time_since_start_seconds / slow_start_time_seconds)``. + * + * As time progresses, more and more traffic would be sent to endpoint, which is in slow start window. + * Once host exits slow start, time_factor and aggression no longer affect its weight. + */ + 'aggression'?: (_envoy_config_core_v3_RuntimeDouble | null); + /** + * Configures the minimum percentage of origin weight that avoids too small new weight, + * which may cause endpoints in slow start mode receive no traffic in slow start window. + * If not specified, the default is 10%. + */ + 'min_weight_percent'?: (_envoy_type_v3_Percent | null); +} + +/** + * Configuration for :ref:`slow start mode `. + */ +export interface SlowStartConfig__Output { + /** + * Represents the size of slow start window. + * If set, the newly created host remains in slow start mode starting from its creation time + * for the duration of slow start window. + */ + 'slow_start_window': (_google_protobuf_Duration__Output | null); + /** + * This parameter controls the speed of traffic increase over the slow start window. Defaults to 1.0, + * so that endpoint would get linearly increasing amount of traffic. + * When increasing the value for this parameter, the speed of traffic ramp-up increases non-linearly. + * The value of aggression parameter should be greater than 0.0. + * By tuning the parameter, is possible to achieve polynomial or exponential shape of ramp-up curve. + * + * During slow start window, effective weight of an endpoint would be scaled with time factor and aggression: + * ``new_weight = weight * max(min_weight_percent, time_factor ^ (1 / aggression))``, + * where ``time_factor=(time_since_start_seconds / slow_start_time_seconds)``. + * + * As time progresses, more and more traffic would be sent to endpoint, which is in slow start window. + * Once host exits slow start, time_factor and aggression no longer affect its weight. + */ + 'aggression': (_envoy_config_core_v3_RuntimeDouble__Output | null); + /** + * Configures the minimum percentage of origin weight that avoids too small new weight, + * which may cause endpoints in slow start mode receive no traffic in slow start window. + * If not specified, the default is 10%. + */ + 'min_weight_percent': (_envoy_type_v3_Percent__Output | null); +} diff --git a/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash.ts b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash.ts new file mode 100644 index 00000000..4e2a7303 --- /dev/null +++ b/packages/grpc-js-xds/src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash.ts @@ -0,0 +1,163 @@ +// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto + +import type { UInt64Value as _google_protobuf_UInt64Value, UInt64Value__Output as _google_protobuf_UInt64Value__Output } from '../../../../../google/protobuf/UInt64Value'; +import type { UInt32Value as _google_protobuf_UInt32Value, UInt32Value__Output as _google_protobuf_UInt32Value__Output } from '../../../../../google/protobuf/UInt32Value'; +import type { ConsistentHashingLbConfig as _envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig, ConsistentHashingLbConfig__Output as _envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig__Output } from '../../../../../envoy/extensions/load_balancing_policies/common/v3/ConsistentHashingLbConfig'; +import type { _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig, _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output } from '../../../../../envoy/extensions/load_balancing_policies/common/v3/LocalityLbConfig'; +import type { Long } from '@grpc/proto-loader'; + +// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto + +/** + * The hash function used to hash hosts onto the ketama ring. + */ +export enum _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction { + /** + * Currently defaults to XX_HASH. + */ + DEFAULT_HASH = 0, + /** + * Use `xxHash `_. + */ + XX_HASH = 1, + /** + * Use `MurmurHash2 `_, this is compatible with + * std:hash in GNU libstdc++ 3.4.20 or above. This is typically the case when compiled + * on Linux and not macOS. + */ + MURMUR_HASH_2 = 2, +} + +/** + * This configuration allows the built-in RING_HASH LB policy to be configured via the LB policy + * extension point. See the :ref:`load balancing architecture overview + * ` for more information. + * [#next-free-field: 8] + */ +export interface RingHash { + /** + * The hash function used to hash hosts onto the ketama ring. The value defaults to + * :ref:`XX_HASH`. + */ + 'hash_function'?: (_envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction | keyof typeof _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction); + /** + * Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each + * provided host) the better the request distribution will reflect the desired weights. Defaults + * to 1024 entries, and limited to 8M entries. See also + * :ref:`maximum_ring_size`. + */ + 'minimum_ring_size'?: (_google_protobuf_UInt64Value | null); + /** + * Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered + * to further constrain resource use. See also + * :ref:`minimum_ring_size`. + */ + 'maximum_ring_size'?: (_google_protobuf_UInt64Value | null); + /** + * If set to `true`, the cluster will use hostname instead of the resolved + * address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address. + * + * ..note:: + * This is deprecated and please use :ref:`consistent_hashing_lb_config + * ` instead. + */ + 'use_hostname_for_hashing'?: (boolean); + /** + * Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150 + * no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster. + * If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200. + * Minimum is 100. + * + * This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified + * `hash_balance_factor`, requests to any upstream host are capped at `hash_balance_factor/100` times the average number of requests + * across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing + * is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify + * the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the + * cascading overflow effect when choosing the next host in the ring/table). + * + * If weights are specified on the hosts, they are respected. + * + * This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor` results in more hosts + * being probed, so use a higher value if you require better performance. + * + * ..note:: + * This is deprecated and please use :ref:`consistent_hashing_lb_config + * ` instead. + */ + 'hash_balance_factor'?: (_google_protobuf_UInt32Value | null); + /** + * Common configuration for hashing-based load balancing policies. + */ + 'consistent_hashing_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig | null); + /** + * Enable locality weighted load balancing for ring hash lb explicitly. + */ + 'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig | null); +} + +/** + * This configuration allows the built-in RING_HASH LB policy to be configured via the LB policy + * extension point. See the :ref:`load balancing architecture overview + * ` for more information. + * [#next-free-field: 8] + */ +export interface RingHash__Output { + /** + * The hash function used to hash hosts onto the ketama ring. The value defaults to + * :ref:`XX_HASH`. + */ + 'hash_function': (keyof typeof _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction); + /** + * Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each + * provided host) the better the request distribution will reflect the desired weights. Defaults + * to 1024 entries, and limited to 8M entries. See also + * :ref:`maximum_ring_size`. + */ + 'minimum_ring_size': (_google_protobuf_UInt64Value__Output | null); + /** + * Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered + * to further constrain resource use. See also + * :ref:`minimum_ring_size`. + */ + 'maximum_ring_size': (_google_protobuf_UInt64Value__Output | null); + /** + * If set to `true`, the cluster will use hostname instead of the resolved + * address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address. + * + * ..note:: + * This is deprecated and please use :ref:`consistent_hashing_lb_config + * ` instead. + */ + 'use_hostname_for_hashing': (boolean); + /** + * Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150 + * no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster. + * If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200. + * Minimum is 100. + * + * This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified + * `hash_balance_factor`, requests to any upstream host are capped at `hash_balance_factor/100` times the average number of requests + * across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing + * is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify + * the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the + * cascading overflow effect when choosing the next host in the ring/table). + * + * If weights are specified on the hosts, they are respected. + * + * This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor` results in more hosts + * being probed, so use a higher value if you require better performance. + * + * ..note:: + * This is deprecated and please use :ref:`consistent_hashing_lb_config + * ` instead. + */ + 'hash_balance_factor': (_google_protobuf_UInt32Value__Output | null); + /** + * Common configuration for hashing-based load balancing policies. + */ + 'consistent_hashing_lb_config': (_envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig__Output | null); + /** + * Enable locality weighted load balancing for ring hash lb explicitly. + */ + 'locality_weighted_lb_config': (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output | null); +} diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts index d62db88d..f59acfbf 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts @@ -2,7 +2,6 @@ import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from '../../validate/FieldRules'; -import type { FieldSecurityAnnotation as _udpa_annotations_FieldSecurityAnnotation, FieldSecurityAnnotation__Output as _udpa_annotations_FieldSecurityAnnotation__Output } from '../../udpa/annotations/FieldSecurityAnnotation'; import type { FieldMigrateAnnotation as _udpa_annotations_FieldMigrateAnnotation, FieldMigrateAnnotation__Output as _udpa_annotations_FieldMigrateAnnotation__Output } from '../../udpa/annotations/FieldMigrateAnnotation'; import type { FieldStatusAnnotation as _xds_annotations_v3_FieldStatusAnnotation, FieldStatusAnnotation__Output as _xds_annotations_v3_FieldStatusAnnotation__Output } from '../../xds/annotations/v3/FieldStatusAnnotation'; @@ -31,8 +30,6 @@ export interface FieldOptions { 'weak'?: (boolean); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; '.validate.rules'?: (_validate_FieldRules | null); - '.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation | null); - '.udpa.annotations.sensitive'?: (boolean); '.envoy.annotations.deprecated_at_minor_version'?: (string); '.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation | null); '.envoy.annotations.disallowed_by_default'?: (boolean); @@ -48,8 +45,6 @@ export interface FieldOptions__Output { 'weak': (boolean); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; '.validate.rules': (_validate_FieldRules__Output | null); - '.udpa.annotations.security': (_udpa_annotations_FieldSecurityAnnotation__Output | null); - '.udpa.annotations.sensitive': (boolean); '.envoy.annotations.deprecated_at_minor_version': (string); '.udpa.annotations.field_migrate': (_udpa_annotations_FieldMigrateAnnotation__Output | null); '.envoy.annotations.disallowed_by_default': (boolean); diff --git a/packages/grpc-js-xds/src/generated/ring_hash.ts b/packages/grpc-js-xds/src/generated/ring_hash.ts new file mode 100644 index 00000000..d298067d --- /dev/null +++ b/packages/grpc-js-xds/src/generated/ring_hash.ts @@ -0,0 +1,172 @@ +import type * as grpc from '@grpc/grpc-js'; +import type { EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader'; + + +type SubtypeConstructor any, Subtype> = { + new(...args: ConstructorParameters): Subtype; +}; + +export interface ProtoGrpcType { + envoy: { + annotations: { + } + config: { + core: { + v3: { + Address: MessageTypeDefinition + AsyncDataSource: MessageTypeDefinition + BackoffStrategy: MessageTypeDefinition + BindConfig: MessageTypeDefinition + BuildVersion: MessageTypeDefinition + CidrRange: MessageTypeDefinition + ControlPlane: MessageTypeDefinition + DataSource: MessageTypeDefinition + EnvoyInternalAddress: MessageTypeDefinition + Extension: MessageTypeDefinition + ExtraSourceAddress: MessageTypeDefinition + HeaderMap: MessageTypeDefinition + HeaderValue: MessageTypeDefinition + HeaderValueOption: MessageTypeDefinition + HttpUri: MessageTypeDefinition + Locality: MessageTypeDefinition + Metadata: MessageTypeDefinition + Node: MessageTypeDefinition + Pipe: MessageTypeDefinition + QueryParameter: MessageTypeDefinition + RemoteDataSource: MessageTypeDefinition + RequestMethod: EnumTypeDefinition + RetryPolicy: MessageTypeDefinition + RoutingPriority: EnumTypeDefinition + RuntimeDouble: MessageTypeDefinition + RuntimeFeatureFlag: MessageTypeDefinition + RuntimeFractionalPercent: MessageTypeDefinition + RuntimePercent: MessageTypeDefinition + RuntimeUInt32: MessageTypeDefinition + SocketAddress: MessageTypeDefinition + SocketOption: MessageTypeDefinition + SocketOptionsOverride: MessageTypeDefinition + TcpKeepalive: MessageTypeDefinition + TrafficDirection: EnumTypeDefinition + TransportSocket: MessageTypeDefinition + WatchedDirectory: MessageTypeDefinition + } + } + } + extensions: { + load_balancing_policies: { + common: { + v3: { + ConsistentHashingLbConfig: MessageTypeDefinition + LocalityLbConfig: MessageTypeDefinition + SlowStartConfig: MessageTypeDefinition + } + } + ring_hash: { + v3: { + RingHash: MessageTypeDefinition + } + } + } + } + type: { + v3: { + FractionalPercent: MessageTypeDefinition + Percent: MessageTypeDefinition + SemanticVersion: MessageTypeDefinition + } + } + } + google: { + protobuf: { + Any: MessageTypeDefinition + BoolValue: MessageTypeDefinition + BytesValue: MessageTypeDefinition + DescriptorProto: MessageTypeDefinition + DoubleValue: MessageTypeDefinition + Duration: MessageTypeDefinition + EnumDescriptorProto: MessageTypeDefinition + EnumOptions: MessageTypeDefinition + EnumValueDescriptorProto: MessageTypeDefinition + EnumValueOptions: MessageTypeDefinition + FieldDescriptorProto: MessageTypeDefinition + FieldOptions: MessageTypeDefinition + FileDescriptorProto: MessageTypeDefinition + FileDescriptorSet: MessageTypeDefinition + FileOptions: MessageTypeDefinition + FloatValue: MessageTypeDefinition + GeneratedCodeInfo: MessageTypeDefinition + Int32Value: MessageTypeDefinition + Int64Value: MessageTypeDefinition + ListValue: MessageTypeDefinition + MessageOptions: MessageTypeDefinition + MethodDescriptorProto: MessageTypeDefinition + MethodOptions: MessageTypeDefinition + NullValue: EnumTypeDefinition + OneofDescriptorProto: MessageTypeDefinition + OneofOptions: MessageTypeDefinition + ServiceDescriptorProto: MessageTypeDefinition + ServiceOptions: MessageTypeDefinition + SourceCodeInfo: MessageTypeDefinition + StringValue: MessageTypeDefinition + Struct: MessageTypeDefinition + Timestamp: MessageTypeDefinition + UInt32Value: MessageTypeDefinition + UInt64Value: MessageTypeDefinition + UninterpretedOption: MessageTypeDefinition + Value: MessageTypeDefinition + } + } + udpa: { + annotations: { + FieldMigrateAnnotation: MessageTypeDefinition + FileMigrateAnnotation: MessageTypeDefinition + MigrateAnnotation: MessageTypeDefinition + PackageVersionStatus: EnumTypeDefinition + StatusAnnotation: MessageTypeDefinition + VersioningAnnotation: MessageTypeDefinition + } + } + validate: { + AnyRules: MessageTypeDefinition + BoolRules: MessageTypeDefinition + BytesRules: MessageTypeDefinition + DoubleRules: MessageTypeDefinition + DurationRules: MessageTypeDefinition + EnumRules: MessageTypeDefinition + FieldRules: MessageTypeDefinition + Fixed32Rules: MessageTypeDefinition + Fixed64Rules: MessageTypeDefinition + FloatRules: MessageTypeDefinition + Int32Rules: MessageTypeDefinition + Int64Rules: MessageTypeDefinition + KnownRegex: EnumTypeDefinition + MapRules: MessageTypeDefinition + MessageRules: MessageTypeDefinition + RepeatedRules: MessageTypeDefinition + SFixed32Rules: MessageTypeDefinition + SFixed64Rules: MessageTypeDefinition + SInt32Rules: MessageTypeDefinition + SInt64Rules: MessageTypeDefinition + StringRules: MessageTypeDefinition + TimestampRules: MessageTypeDefinition + UInt32Rules: MessageTypeDefinition + UInt64Rules: MessageTypeDefinition + } + xds: { + annotations: { + v3: { + FieldStatusAnnotation: MessageTypeDefinition + FileStatusAnnotation: MessageTypeDefinition + MessageStatusAnnotation: MessageTypeDefinition + PackageVersionStatus: EnumTypeDefinition + StatusAnnotation: MessageTypeDefinition + } + } + core: { + v3: { + ContextParams: MessageTypeDefinition + } + } + } +} + diff --git a/packages/grpc-js-xds/src/http-filter.ts b/packages/grpc-js-xds/src/http-filter.ts index 29ce5958..f8da5b82 100644 --- a/packages/grpc-js-xds/src/http-filter.ts +++ b/packages/grpc-js-xds/src/http-filter.ts @@ -116,7 +116,7 @@ export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean try { typeUrl = getTopLevelFilterUrl(encodedConfig); } catch (e) { - trace(httpFilter.name + ' validation failed with error ' + e.message); + trace(httpFilter.name + ' validation failed with error ' + (e as Error).message); return false; } const registryEntry = FILTER_REGISTRY.get(typeUrl); @@ -243,4 +243,4 @@ export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: Http } else { return null; } -} \ No newline at end of file +} diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 95c26a20..70aa5bef 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -23,6 +23,7 @@ import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager'; import * as xds_wrr_locality from './load-balancer-xds-wrr-locality'; +import * as ring_hash from './load-balancer-ring-hash'; import * as router_filter from './http-filter/router-filter'; import * as fault_injection_filter from './http-filter/fault-injection-filter'; import * as csds from './csds'; @@ -41,6 +42,7 @@ export function register() { load_balancer_weighted_target.setup(); load_balancer_xds_cluster_manager.setup(); xds_wrr_locality.setup(); + ring_hash.setup(); router_filter.setup(); fault_injection_filter.setup(); csds.setup(); diff --git a/packages/grpc-js-xds/src/load-balancer-priority.ts b/packages/grpc-js-xds/src/load-balancer-priority.ts index 4a4acb47..54e01fa8 100644 --- a/packages/grpc-js-xds/src/load-balancer-priority.ts +++ b/packages/grpc-js-xds/src/load-balancer-priority.ts @@ -41,9 +41,26 @@ const DEFAULT_FAILOVER_TIME_MS = 10_000; const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; export interface LocalityEndpoint extends Endpoint { + /** + * A sequence of strings that determines how to divide endpoints up in priority and + * weighted_target. + */ localityPath: string[]; + /** + * The locality this endpoint is in. Used in wrr_locality and xds_cluster_impl. + */ locality: Locality__Output; - weight: number; + /** + * The load balancing weight for the entire locality that contains this + * endpoint. Used in xds_wrr_locality. + */ + localityWeight: number; + /** + * The overall load balancing weight for this endpoint, calculated as the + * product of the load balancing weight for this endpoint within its locality + * and the load balancing weight of the locality. Used in ring_hash. + */ + endpointWeight: number; }; export function isLocalityEndpoint( @@ -317,7 +334,7 @@ export class PriorityLoadBalancer implements LoadBalancer { * so that when the picker calls exitIdle, that in turn calls exitIdle on * the PriorityChildImpl, which will start the failover timer. */ if (state === ConnectivityState.IDLE) { - picker = new QueuePicker(this); + picker = new QueuePicker(this, picker); } this.channelControlHelper.updateState(state, picker); } diff --git a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts new file mode 100644 index 00000000..a124d3b8 --- /dev/null +++ b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts @@ -0,0 +1,507 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, connectivityState, status, Metadata, ChannelOptions, LoadBalancingConfig } from '@grpc/grpc-js'; +import { isLocalityEndpoint } from './load-balancer-priority'; +import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; +import LeafLoadBalancer = experimental.LeafLoadBalancer; +import Endpoint = experimental.Endpoint; +import Picker = experimental.Picker; +import PickArgs = experimental.PickArgs; +import PickResult = experimental.PickResult; +import PickResultType = experimental.PickResultType; +import LoadBalancer = experimental.LoadBalancer; +import ChannelControlHelper = experimental.ChannelControlHelper; +import createChildChannelControlHelper = experimental.createChildChannelControlHelper; +import UnavailablePicker = experimental.UnavailablePicker; +import subchannelAddressToString = experimental.subchannelAddressToString; +import registerLoadBalancerType = experimental.registerLoadBalancerType; +import EndpointMap = experimental.EndpointMap; +import { loadXxhashApi, xxhashApi } from './xxhash'; +import { EXPERIMENTAL_RING_HASH } from './environment'; +import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; +import { RingHash__Output } from './generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash'; +import { Any__Output } from './generated/google/protobuf/Any'; +import { TypedExtensionConfig__Output } from './generated/envoy/config/core/v3/TypedExtensionConfig'; +import { LoadBalancingPolicy__Output } from './generated/envoy/config/cluster/v3/LoadBalancingPolicy'; +import { registerLbPolicy } from './lb-policy-registry'; + +const TRACER_NAME = 'ring_hash'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const TYPE_NAME = 'ring_hash'; + +const DEFAULT_MIN_RING_SIZE = 1024; +const DEFAULT_MAX_RING_SIZE = 4096; +const ABSOLUTE_MAX_RING_SIZE = 8_388_608; +const DEFAULT_RING_SIZE_CAP = 4096; + +class RingHashLoadBalancingConfig implements TypedLoadBalancingConfig { + private minRingSize: number; + private maxRingSize: number; + constructor(minRingSize?: number, maxRingSize?: number) { + this.minRingSize = Math.min( + minRingSize ?? DEFAULT_MIN_RING_SIZE, + ABSOLUTE_MAX_RING_SIZE + ); + this.maxRingSize = Math.min( + maxRingSize ?? DEFAULT_MAX_RING_SIZE, + ABSOLUTE_MAX_RING_SIZE + ); + } + getLoadBalancerName(): string { + return TYPE_NAME; + } + toJsonObject(): object { + return { + [TYPE_NAME]: { + min_ring_size: this.minRingSize, + max_ring_size: this.maxRingSize, + } + }; + } + getMinRingSize() { + return this.minRingSize; + } + getMaxRingSize() { + return this.maxRingSize; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + static createFromJson(obj: any): TypedLoadBalancingConfig { + if ('min_ring_size' in obj) { + if (typeof obj.min_ring_size === 'number') { + if (obj.min_ring_size > ABSOLUTE_MAX_RING_SIZE) { + throw new Error(`ring_hash config field min_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.min_ring_size}`); + } + } else { + throw new Error( + 'ring_hash config field min_ring_size must be a number if provided' + ); + } + } + if ('max_ring_size' in obj) { + if (typeof obj.max_ring_size === 'number') { + if (obj.max_ring_size > ABSOLUTE_MAX_RING_SIZE) { + throw new Error(`ring_hash config field max_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.max_ring_size}`); + } + } else { + throw new Error( + 'ring_hash config field max_ring_size must be a number if provided' + ); + } + } + return new RingHashLoadBalancingConfig( + obj.min_ring_size, + obj.max_ring_size + ); + } +} + +interface RingEntry { + leafBalancer: LeafLoadBalancer; + hash: bigint; +} + +interface EndpointWeight { + endpoint: Endpoint; + weight: number; + normalizedWeight: number; +} + +class RingHashPicker implements Picker { + constructor(private ring: RingEntry[]) {} + /** + * Find the least index in the ring with a hash greater than or equal to the + * hash parameter, or 0 if no such index exists. + * @param hash + */ + private findIndexForHash(hash: bigint): number { + // Binary search to find the target index + let low = 0; + let high = this.ring.length; + let index = 0; + while (low <= high) { + /* Commonly in binary search, this operation can overflow and result in + * the wrong value. However, in this case the ring size is absolutely + * limtied to 1<<23, so low+high < MAX_SAFE_INTEGER */ + index = Math.floor((low + high) / 2); + if (index === this.ring.length) { + index = 0; + break; + } + const midval = this.ring[index].hash; + const midval1 = index === 0 ? 0n : this.ring[index - 1].hash; + if (hash <= midval && hash > midval1) { + break; + } + if (midval < hash) { + low = index + 1; + } else { + high = index - 1; + } + if (low > high) { + index = 0; + break; + } + } + return index; + } + pick(pickArgs: PickArgs): PickResult { + trace('Pick called. Hash=' + pickArgs.extraPickInfo.hash); + const firstIndex = this.findIndexForHash( + BigInt(pickArgs.extraPickInfo.hash) + ); + for (let i = 0; i < this.ring.length; i++) { + const index = (firstIndex + i) % this.ring.length; + const entryState = this.ring[index].leafBalancer.getConnectivityState(); + if (entryState === connectivityState.READY) { + return this.ring[index].leafBalancer.getPicker().pick(pickArgs); + } + if (entryState === connectivityState.IDLE) { + this.ring[index].leafBalancer.startConnecting(); + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } + if (entryState === connectivityState.CONNECTING) { + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } + } + return { + pickResultType: PickResultType.TRANSIENT_FAILURE, + status: { + code: status.UNAVAILABLE, + details: + 'ring_hash: invalid state: all child balancers in TRANSIENT_FAILURE', + metadata: new Metadata(), + }, + subchannel: null, + onCallStarted: null, + onCallEnded: null, + }; + } +} + +class RingHashLoadBalancer implements LoadBalancer { + /** + * Tracks endpoint repetition across address updates, to use an appropriate + * existing leaf load balancer for the same endpoint when possible. + */ + private leafMap = new EndpointMap(); + /** + * Tracks endpoints from a single address update, with their associated + * weights aggregated from all weights associated with that endpoint in that + * update. + */ + private leafWeightMap = new EndpointMap(); + private childChannelControlHelper: ChannelControlHelper; + private updatesPaused = false; + private currentState: connectivityState = connectivityState.IDLE; + private ring: RingEntry[] = []; + private ringHashSizeCap = DEFAULT_RING_SIZE_CAP; + constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) { + this.childChannelControlHelper = createChildChannelControlHelper( + channelControlHelper, + { + updateState: (state, picker) => { + this.calculateAndUpdateState(); + /* If this LB policy is in the TRANSIENT_FAILURE state, requests will + * not trigger new connections, so we need to explicitly try connecting + * to other endpoints that are currently IDLE to try to eventually + * connect to something. */ + if ( + state === connectivityState.TRANSIENT_FAILURE && + this.currentState === connectivityState.TRANSIENT_FAILURE + ) { + for (const leaf of this.leafMap.values()) { + const leafState = leaf.getConnectivityState(); + if (leafState === connectivityState.CONNECTING) { + break; + } + if (leafState === connectivityState.IDLE) { + leaf.startConnecting(); + break; + } + } + } + }, + } + ); + if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) { + this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap']; + } + } + + private calculateAndUpdateState() { + if (this.updatesPaused) { + return; + } + const stateCounts = { + [connectivityState.READY]: 0, + [connectivityState.TRANSIENT_FAILURE]: 0, + [connectivityState.CONNECTING]: 0, + [connectivityState.IDLE]: 0, + [connectivityState.SHUTDOWN]: 0, + }; + for (const leaf of this.leafMap.values()) { + stateCounts[leaf.getConnectivityState()] += 1; + } + if (stateCounts[connectivityState.READY] > 0) { + this.updateState(connectivityState.READY, new RingHashPicker(this.ring)); + // REPORT READY + } else if (stateCounts[connectivityState.TRANSIENT_FAILURE] > 1) { + this.updateState( + connectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + } else if (stateCounts[connectivityState.CONNECTING] > 0) { + this.updateState( + connectivityState.CONNECTING, + new RingHashPicker(this.ring) + ); + } else if ( + stateCounts[connectivityState.TRANSIENT_FAILURE] > 0 && + this.leafMap.size > 1 + ) { + this.updateState( + connectivityState.CONNECTING, + new RingHashPicker(this.ring) + ); + } else if (stateCounts[connectivityState.IDLE] > 0) { + this.updateState(connectivityState.IDLE, new RingHashPicker(this.ring)); + } else { + this.updateState( + connectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + } + } + + private updateState(newState: connectivityState, picker: Picker) { + trace( + connectivityState[this.currentState] + + ' -> ' + + connectivityState[newState] + ); + this.currentState = newState; + this.channelControlHelper.updateState(newState, picker); + } + + private constructRing( + endpointList: Endpoint[], + config: RingHashLoadBalancingConfig + ) { + this.ring = []; + const endpointWeights: EndpointWeight[] = []; + let weightSum = 0; + for (const endpoint of endpointList) { + const weight = this.leafWeightMap.get(endpoint) ?? 1; + endpointWeights.push({ endpoint, weight, normalizedWeight: 0 }); + weightSum += weight; + } + /* The normalized weights sum to 1, with some small potential error due to + * the limitation of floating point precision. */ + let minNormalizedWeight = 1; + for (const endpointWeight of endpointWeights) { + endpointWeight.normalizedWeight = endpointWeight.weight / weightSum; + minNormalizedWeight = Math.min( + endpointWeight.normalizedWeight, + minNormalizedWeight + ); + } + const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap); + const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap); + /* Calculate a scale factor that meets the following conditions: + * 1. The result is between minRingSize and maxRingSize, inclusive + * 2. The smallest normalized weight is scaled to a whole number, if it + * does not violate the previous condition. + * The size of the ring is ceil(scale) + */ + const scale = Math.min( + Math.ceil(minNormalizedWeight * minRingSize) / minNormalizedWeight, + maxRingSize + ); + trace('Creating a ring with size ' + Math.ceil(scale)); + /* For each endpoint, create a number of entries proportional to its + * weight, such that the total number of entries is equal to ceil(scale). + */ + let currentHashes = 0; + let targetHashes = 0; + for (const endpointWeight of endpointWeights) { + const addressString = subchannelAddressToString( + endpointWeight.endpoint.addresses[0] + ); + targetHashes += scale * endpointWeight.normalizedWeight; + const leafBalancer = this.leafMap.get(endpointWeight.endpoint); + if (!leafBalancer) { + throw new Error( + 'ring_hash: Invalid state: endpoint found in leafWeightMap but not in leafMap' + ); + } + let count = 0; + while (currentHashes < targetHashes) { + const hashKey = `${addressString}_${count}`; + const hash = xxhashApi!.h64(hashKey, 0n); + this.ring.push({ hash, leafBalancer }); + currentHashes++; + count++; + } + } + /* The ring is sorted by the hash so that it can be efficiently searched + * for a hash that is closest to any arbitrary hash. */ + this.ring.sort((a, b) => { + if (a.hash > b.hash) { + return 1; + } else if (a.hash < b.hash) { + return -1; + } else { + return 0; + } + }); + } + + updateAddressList( + endpointList: Endpoint[], + lbConfig: TypedLoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!(lbConfig instanceof RingHashLoadBalancingConfig)) { + trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + return; + } + trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + this.updatesPaused = true; + this.leafWeightMap.clear(); + const dedupedEndpointList: Endpoint[] = []; + for (const endpoint of endpointList) { + const leafBalancer = this.leafMap.get(endpoint); + if (leafBalancer) { + leafBalancer.updateEndpoint(endpoint); + } else { + this.leafMap.set( + endpoint, + new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options) + ); + } + const weight = this.leafWeightMap.get(endpoint); + if (weight === undefined) { + dedupedEndpointList.push(endpoint); + } + this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1)); + } + const removedLeaves = this.leafMap.deleteMissing(endpointList); + for (const leaf of removedLeaves) { + leaf.destroy(); + } + loadXxhashApi().then(() => { + this.constructRing(dedupedEndpointList, lbConfig); + this.updatesPaused = false; + this.calculateAndUpdateState(); + }); + } + exitIdle(): void { + /* This operation does not make sense here. We don't want to make the whole + * balancer exit idle, and instead propagate that to individual chlidren as + * relevant. */ + } + resetBackoff(): void { + // There is no backoff to reset here + } + destroy(): void { + this.ring = []; + for (const child of this.leafMap.values()) { + child.destroy(); + } + this.leafMap.clear(); + this.leafWeightMap.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +const RING_HASH_TYPE_URL = 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash'; + +const resourceRoot = loadProtosWithOptionsSync([ + 'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto'], { + keepCase: true, + includeDirs: [ + // Paths are relative to src/build + __dirname + '/../../deps/envoy-api/', + __dirname + '/../../deps/xds/', + __dirname + '/../../deps/protoc-gen-validate' + ], + } +); + +const toObjectOptions = { + longs: String, + enums: String, + defaults: true, + oneofs: true +} + +function decodeRingHash(message: Any__Output): RingHash__Output { + const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1); + const type = resourceRoot.lookup(name); + if (type) { + const decodedMessage = (type as any).decode(message.value); + return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as RingHash__Output; + } else { + throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`); + } +} + +function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig { + if (protoPolicy.typed_config?.type_url !== RING_HASH_TYPE_URL) { + throw new Error(`Ring Hash LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`); + } + const ringHashMessage = decodeRingHash(protoPolicy.typed_config); + if (ringHashMessage.hash_function !== 'XX_HASH') { + throw new Error(`Ring Hash LB policy parsing error: unexpected hash function ${ringHashMessage.hash_function}`); + } + return { + [TYPE_NAME]: { + min_ring_size: ringHashMessage.minimum_ring_size?.value ?? 1024, + max_ring_size: ringHashMessage.maximum_ring_size?.value ?? 8_388_608 + } + }; +} + +export function setup() { + if (EXPERIMENTAL_RING_HASH) { + registerLoadBalancerType( + TYPE_NAME, + RingHashLoadBalancer, + RingHashLoadBalancingConfig + ); + registerLbPolicy(RING_HASH_TYPE_URL, convertToLoadBalancingPolicy); + } +} diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index 3560d848..99059dca 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -204,35 +204,7 @@ class XdsClusterManager implements LoadBalancer { } else { connectivityState = ConnectivityState.TRANSIENT_FAILURE; } - /* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is - * exactly one corresponding picker, so if the state is one of those and - * that does not change, no new information is provided by passing the - * new state upward. */ - if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) { - return; - } - let picker: Picker; - - switch (connectivityState) { - case ConnectivityState.READY: - picker = new XdsClusterManagerPicker(pickerMap); - break; - case ConnectivityState.CONNECTING: - case ConnectivityState.IDLE: - picker = new QueuePicker(this); - break; - default: - picker = new UnavailablePicker({ - code: Status.UNAVAILABLE, - details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE', - metadata: new Metadata() - }); - } - trace( - 'Transitioning to ' + - ConnectivityState[connectivityState] - ); - this.channelControlHelper.updateState(connectivityState, picker); + this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap)); } updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts index c4bf984f..29c0b6f3 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts @@ -115,10 +115,15 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig } } +interface WeightedEndpoint { + endpoint: Endpoint; + weight: number; +} + interface LocalityEntry { locality: Locality__Output; weight: number; - endpoints: Endpoint[]; + endpoints: WeightedEndpoint[]; } interface PriorityEntry { @@ -166,16 +171,19 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt if (!endpoint.load_balancing_weight) { continue; } - const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( + const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( (lbEndpoint) => { /* The validator in the XdsClient class ensures that each endpoint has * a socket_address with an IP address and a port_value. */ const socketAddress = lbEndpoint.endpoint!.address!.socket_address!; return { - addresses: [{ - host: socketAddress.address!, - port: socketAddress.port_value!, - }] + endpoint: { + addresses: [{ + host: socketAddress.address!, + port: socketAddress.port_value!, + }] + }, + weight: lbEndpoint.load_balancing_weight?.value ?? 1 }; } ); @@ -211,7 +219,7 @@ function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] { sub_zone: '' }, weight: 1, - endpoints: endpoints + endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1})) }], dropCategories: [] }]; @@ -295,15 +303,16 @@ export class XdsClusterResolver implements LoadBalancer { newPriorityNames[priority] = newPriorityName; for (const localityObj of priorityEntry.localities) { - for (const endpoint of localityObj.endpoints) { + for (const weightedEndpoint of localityObj.endpoints) { endpointList.push({ localityPath: [ newPriorityName, localityToName(localityObj.locality), ], locality: localityObj.locality, - weight: localityObj.weight, - ...endpoint + localityWeight: localityObj.weight, + endpointWeight: localityObj.weight * weightedEndpoint.weight, + ...weightedEndpoint.endpoint }); } newLocalityPriorities.set(localityToName(localityObj.locality), priority); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index f3fbcd51..3fb57d6e 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -90,7 +90,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { if (!(localityName in targets)) { targets[localityName] = { child_policy: lbConfig.getChildPolicy(), - weight: address.weight + weight: address.localityWeight }; } } diff --git a/packages/grpc-js-xds/src/matcher.ts b/packages/grpc-js-xds/src/matcher.ts index 148df7f8..b657d32c 100644 --- a/packages/grpc-js-xds/src/matcher.ts +++ b/packages/grpc-js-xds/src/matcher.ts @@ -71,7 +71,7 @@ export class SafeRegexValueMatcher implements ValueMatcher { const numberRegex = new RE2(/^-?\d+$/u); export class RangeValueMatcher implements ValueMatcher { - constructor(private start: BigInt, private end: BigInt) {} + constructor(private start: bigint, private end: bigint) {} apply(value: string) { if (!numberRegex.test(value)) { @@ -264,4 +264,4 @@ export class FullMatcher implements Matcher { headers: ${this.headerMatchers.map(matcher => matcher.toString()).join('\n\t')} fraction: ${this.fraction ? fractionToString(this.fraction): 'none'}`; } -} \ No newline at end of file +} diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index cd830d52..5182e100 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -34,18 +34,19 @@ import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderM import ConfigSelector = experimental.ConfigSelector; import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher'; import { envoyFractionToFraction, Fraction } from "./fraction"; -import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; +import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources'; import Duration = experimental.Duration; import { Duration__Output } from './generated/google/protobuf/Duration'; import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter'; -import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY } from './environment'; +import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY, EXPERIMENTAL_RING_HASH } from './environment'; import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; import { BootstrapInfo, loadBootstrapInfo, validateBootstrapConfig } from './xds-bootstrap'; import { ListenerResourceType } from './xds-resource-type/listener-resource-type'; import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type'; import { protoDurationToDuration } from './duration'; +import { loadXxhashApi } from './xxhash'; const TRACER_NAME = 'xds_resolver'; @@ -381,7 +382,11 @@ class XdsResolver implements Resolver { } } - private handleRouteConfig(routeConfig: RouteConfiguration__Output) { + private async handleRouteConfig(routeConfig: RouteConfiguration__Output) { + /* We need to load the xxhash API before this function finishes, because + * it is invoked in the config selector, which can be called immediately + * after this function returns. */ + await loadXxhashApi(); this.latestRouteConfig = routeConfig; /* Select the virtual host using the default authority override if it * exists, and the channel target otherwise. */ @@ -456,6 +461,26 @@ class XdsResolver implements Resolver { } } } + const hashPolicies: HashPolicy[] = []; + if (EXPERIMENTAL_RING_HASH) { + for (const routeHashPolicy of route.route!.hash_policy) { + if (routeHashPolicy.policy_specifier === 'header') { + const headerPolicy = routeHashPolicy.header!; + hashPolicies.push({ + type: 'HEADER', + terminal: routeHashPolicy.terminal, + headerName: headerPolicy.header_name, + regex: headerPolicy.regex_rewrite?.pattern ? new RE2(headerPolicy.regex_rewrite.pattern.regex, 'ug') : undefined, + regexSubstitution: headerPolicy.regex_rewrite?.substitution + }); + } else if (routeHashPolicy.policy_specifier === 'filter_state' && routeHashPolicy.filter_state!.key === 'io.grpc.channel_id') { + hashPolicies.push({ + type: 'CHANNEL_ID', + terminal: routeHashPolicy.terminal + }); + } + } + } switch (route.route!.cluster_specifier) { case 'cluster_header': continue; @@ -483,7 +508,7 @@ class XdsResolver implements Resolver { } } } - routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories); + routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories, hashPolicies); break; } case 'weighted_clusters': { @@ -525,7 +550,7 @@ class XdsResolver implements Resolver { } weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories}); } - routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}); + routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}, hashPolicies); break; } default: @@ -554,7 +579,7 @@ class XdsResolver implements Resolver { this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0}); } } - const configSelector: ConfigSelector = (methodName, metadata) => { + const configSelector: ConfigSelector = (methodName, metadata, channelId) => { for (const {matcher, action} of matchList) { if (matcher.apply(methodName, metadata)) { const clusterResult = action.getCluster(); @@ -562,10 +587,16 @@ class XdsResolver implements Resolver { const onCommitted = () => { this.unrefCluster(clusterResult.name); } + let hash: string; + if (EXPERIMENTAL_RING_HASH) { + hash = `${action.getHash(metadata, channelId)}`; + } else { + hash = ''; + } return { methodConfig: clusterResult.methodConfig, onCommitted: onCommitted, - pickInformation: {cluster: clusterResult.name}, + pickInformation: {cluster: clusterResult.name, hash: hash}, status: status.OK, dynamicFilterFactories: clusterResult.dynamicFilterFactories }; @@ -573,8 +604,8 @@ class XdsResolver implements Resolver { } return { methodConfig: {name: []}, - // cluster won't be used here, but it's set because of some TypeScript weirdness - pickInformation: {cluster: ''}, + // These fields won't be used here, but they're set because of some TypeScript weirdness + pickInformation: {cluster: '', hash: ''}, status: status.UNAVAILABLE, dynamicFilterFactories: [] }; diff --git a/packages/grpc-js-xds/src/route-action.ts b/packages/grpc-js-xds/src/route-action.ts index 83530f1b..2f87fee9 100644 --- a/packages/grpc-js-xds/src/route-action.ts +++ b/packages/grpc-js-xds/src/route-action.ts @@ -14,10 +14,12 @@ * limitations under the License. */ -import { MethodConfig, experimental } from '@grpc/grpc-js'; +import { Metadata, MethodConfig, experimental } from '@grpc/grpc-js'; import Duration = experimental.Duration; import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; +import { RE2 } from 're2-wasm'; +import { xxhashApi } from './xxhash'; export interface ClusterResult { name: string; @@ -28,6 +30,7 @@ export interface ClusterResult { export interface RouteAction { toString(): string; getCluster(): ClusterResult; + getHash(metadata: Metadata, channelId: number): bigint; } function durationToLogString(duration: Duration) { @@ -39,8 +42,83 @@ function durationToLogString(duration: Duration) { } } +export interface HashPolicy { + type: 'HEADER' | 'CHANNEL_ID'; + terminal: boolean; + headerName?: string; + regex?: RE2; + regexSubstitution?: string; +} + +/** + * Must be called only after xxhash.loadXxhashApi() resolves. + * @param hashPolicies + * @param metadata + * @param channelId + */ +function getHash(hashPolicies: HashPolicy[], metadata: Metadata, channelId: number): bigint { + let hash: bigint | null = null; + for (const policy of hashPolicies) { + let newHash: bigint | null = null; + switch (policy.type) { + case 'CHANNEL_ID': + newHash = xxhashApi!.h64(`${channelId}`, 0n); + break; + case 'HEADER': { + if (!policy.headerName) { + break; + } + if (policy.headerName.endsWith('-bin')) { + break; + } + let headerString: string; + if (policy.headerName === 'content-type') { + headerString = 'application/grpc'; + } else { + const headerValues = metadata.get(policy.headerName); + if (headerValues.length === 0) { + break; + } + headerString = headerValues.join(','); + } + let rewrittenHeaderString = headerString; + if (policy.regex && policy.regexSubstitution) { + /* The JS string replace method uses $-prefixed patterns to produce + * other strings. See + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace#specifying_a_string_as_the_replacement + * RE2-based regex substitutions use \n where n is a number to refer + * to capture group n, and they otherwise have no special replacement + * patterns. See + * https://github.com/envoyproxy/envoy/blob/2443032526cf6e50d63d35770df9473dd0460fc0/api/envoy/type/matcher/v3/regex.proto#L79-L87 + * We convert an RE2 regex substitution into a string substitution by + * first replacing each "$" with "$$" (which produces "$" in the + * output), and then replace each "\n" for any whole number n with + * "$n". */ + const regexSubstitution = policy.regexSubstitution.replace(/\$/g, '$$$$').replace(/\\(\d+)/g, '$$$1'); + rewrittenHeaderString = headerString.replace(policy.regex, regexSubstitution); + } + newHash = xxhashApi!.h64(rewrittenHeaderString, 0n); + break; + } + } + if (hash === null) { + hash = newHash; + } else if (newHash !== null) { + hash = ((hash << 1n) | (hash >> 63n)) ^ newHash; + } + if (policy.terminal && hash !== null) { + break; + } + } + if (hash === null) { + return xxhashApi!.h64(`${Math.random()}`, 0n); + } else { + return hash; + } +} + export class SingleClusterRouteAction implements RouteAction { - constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory[]) {} + constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory[], private hashPolicies: HashPolicy[]) {} getCluster() { return { @@ -50,6 +128,10 @@ export class SingleClusterRouteAction implements RouteAction { }; } + getHash(metadata: Metadata, channelId: number): bigint { + return getHash(this.hashPolicies, metadata, channelId); + } + toString() { return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')'; } @@ -72,7 +154,7 @@ export class WeightedClusterRouteAction implements RouteAction { * The weighted cluster choices represented as a CDF */ private clusterChoices: ClusterChoice[]; - constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) { + constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig, private hashPolicies: HashPolicy[]) { this.clusterChoices = []; let lastNumerator = 0; for (const clusterWeight of clusters) { @@ -96,6 +178,10 @@ export class WeightedClusterRouteAction implements RouteAction { return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []}; } + getHash(metadata: Metadata, channelId: number): bigint { + return getHash(this.hashPolicies, metadata, channelId); + } + toString() { const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ') return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')'; diff --git a/packages/grpc-js-xds/src/xds-bootstrap.ts b/packages/grpc-js-xds/src/xds-bootstrap.ts index fccd3edc..536439dd 100644 --- a/packages/grpc-js-xds/src/xds-bootstrap.ts +++ b/packages/grpc-js-xds/src/xds-bootstrap.ts @@ -357,14 +357,14 @@ export function loadBootstrapInfo(): BootstrapInfo { try { rawBootstrap = fs.readFileSync(bootstrapPath, { encoding: 'utf8'}); } catch (e) { - throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${e.message}`); + throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${(e as Error).message}`); } try { const parsedFile = JSON.parse(rawBootstrap); loadedBootstrapInfo = validateBootstrapConfig(parsedFile); return loadedBootstrapInfo; } catch (e) { - throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`) + throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${(e as Error).message}`) } } @@ -383,7 +383,7 @@ export function loadBootstrapInfo(): BootstrapInfo { loadedBootstrapInfo = validateBootstrapConfig(parsedConfig); } catch (e) { throw new Error( - `Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${e.message}` + `Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${(e as Error).message}` ); } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 464e2659..d1aed926 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -208,14 +208,14 @@ class AdsResponseParser { try { decodeResult = this.result.type.decode(decodeContext, resource); } catch (e) { - this.result.errors.push(`${errorPrefix} ${e.message}`); + this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } let parsedName: XdsResourceName; try { parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl()); } catch (e) { - this.result.errors.push(`${errorPrefix} ${e.message}`); + this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen(); @@ -250,7 +250,7 @@ class AdsResponseParser { if (!decodeResult.value) { return; } - this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, undefined, 2)); + this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2)); this.result.haveValidResources = true; if (this.result.type.resourcesEqual(resourceState.cachedResource, decodeResult.value)) { return; diff --git a/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts b/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts index 1e8ea41f..c4081baa 100644 --- a/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts +++ b/packages/grpc-js-xds/src/xds-resource-type/cluster-resource-type.ts @@ -21,7 +21,7 @@ import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js"; import { XdsServerConfig } from "../xds-bootstrap"; import { Duration__Output } from "../generated/google/protobuf/Duration"; import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection"; -import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION } from "../environment"; +import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION, EXPERIMENTAL_RING_HASH } from "../environment"; import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster"; import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value"; import { Any__Output } from "../generated/google/protobuf/Any"; @@ -150,6 +150,24 @@ export class ClusterResourceType extends XdsResourceType { child_policy: [{round_robin: {}}] } }; + } else if(EXPERIMENTAL_RING_HASH && message.lb_policy === 'RING_HASH') { + if (message.ring_hash_lb_config && message.ring_hash_lb_config.hash_function !== 'XX_HASH') { + return null; + } + const minRingSize = message.ring_hash_lb_config?.minimum_ring_size ? Number(message.ring_hash_lb_config.minimum_ring_size.value) : 1024; + if (minRingSize > 8_388_608) { + return null; + } + const maxRingSize = message.ring_hash_lb_config?.maximum_ring_size ? Number(message.ring_hash_lb_config.maximum_ring_size.value) : 8_388_608; + if (maxRingSize > 8_388_608) { + return null; + } + lbPolicyConfig = { + ring_hash: { + min_ring_size: minRingSize, + max_ring_size: maxRingSize + } + }; } else { return null; } @@ -264,6 +282,7 @@ export class ClusterResourceType extends XdsResourceType { ); } const message = decodeSingleResource(CDS_TYPE_URL, resource.value); + trace('Decoded raw resource of type ' + CDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2)); const validatedMessage = this.validateResource(context, message); if (validatedMessage) { return { diff --git a/packages/grpc-js-xds/src/xds-resource-type/endpoint-resource-type.ts b/packages/grpc-js-xds/src/xds-resource-type/endpoint-resource-type.ts index 6ffd7788..093ca52e 100644 --- a/packages/grpc-js-xds/src/xds-resource-type/endpoint-resource-type.ts +++ b/packages/grpc-js-xds/src/xds-resource-type/endpoint-resource-type.ts @@ -101,6 +101,7 @@ export class EndpointResourceType extends XdsResourceType { ); } const message = decodeSingleResource(EDS_TYPE_URL, resource.value); + trace('Decoded raw resource of type ' + EDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2)); const validatedMessage = this.validateResource(message); if (validatedMessage) { return { diff --git a/packages/grpc-js-xds/src/xds-resource-type/listener-resource-type.ts b/packages/grpc-js-xds/src/xds-resource-type/listener-resource-type.ts index 20e243b8..cf5d4d59 100644 --- a/packages/grpc-js-xds/src/xds-resource-type/listener-resource-type.ts +++ b/packages/grpc-js-xds/src/xds-resource-type/listener-resource-type.ts @@ -106,6 +106,7 @@ export class ListenerResourceType extends XdsResourceType { ); } const message = decodeSingleResource(LDS_TYPE_URL, resource.value); + trace('Decoded raw resource of type ' + LDS_TYPE_URL + ': ' + JSON.stringify(message, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2)); const validatedMessage = this.validateResource(message); if (validatedMessage) { return { diff --git a/packages/grpc-js-xds/src/xds-resource-type/route-config-resource-type.ts b/packages/grpc-js-xds/src/xds-resource-type/route-config-resource-type.ts index cdc2e319..766a8438 100644 --- a/packages/grpc-js-xds/src/xds-resource-type/route-config-resource-type.ts +++ b/packages/grpc-js-xds/src/xds-resource-type/route-config-resource-type.ts @@ -15,6 +15,7 @@ * */ +import { experimental, logVerbosity } from "@grpc/grpc-js"; import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment"; import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy"; import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration"; @@ -24,6 +25,11 @@ import { validateOverrideFilter } from "../http-filter"; import { RDS_TYPE_URL, decodeSingleResource } from "../resources"; import { Watcher, XdsClient } from "../xds-client"; import { XdsDecodeContext, XdsDecodeResult, XdsResourceType } from "./xds-resource-type"; +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex']; const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [ @@ -169,6 +175,7 @@ export class RouteConfigurationResourceType extends XdsResourceType { ); } const message = decodeSingleResource(RDS_TYPE_URL, resource.value); + trace('Decoded raw resource of type ' + RDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2)); const validatedMessage = this.validateResource(message); if (validatedMessage) { return { diff --git a/packages/grpc-js-xds/src/xxhash.ts b/packages/grpc-js-xds/src/xxhash.ts new file mode 100644 index 00000000..63f68af2 --- /dev/null +++ b/packages/grpc-js-xds/src/xxhash.ts @@ -0,0 +1,31 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* The simpler `import xxhash from 'xxhash-wasm';` doesn't compile correctly + * to CommonJS require calls for some reason, so we use this import to get + * the type, and then an explicit require call to get the actual value. */ +import xxhashImport from 'xxhash-wasm'; +const xxhash: typeof xxhashImport = require('xxhash-wasm'); + +export let xxhashApi: Awaited> | null = null; + +export async function loadXxhashApi() { + if (!xxhashApi) { + xxhashApi = await xxhash(); + } + return xxhashApi; +} diff --git a/packages/grpc-js-xds/test/framework.ts b/packages/grpc-js-xds/test/framework.ts index f89a4680..bd6f270d 100644 --- a/packages/grpc-js-xds/test/framework.ts +++ b/packages/grpc-js-xds/test/framework.ts @@ -70,7 +70,7 @@ export interface FakeCluster { } export class FakeEdsCluster implements FakeCluster { - constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any) {} + constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any | 'RING_HASH') {} getEndpointConfig(): ClusterLoadAssignment { return { @@ -94,7 +94,9 @@ export class FakeEdsCluster implements FakeCluster { ] } }; - if (this.loadBalancingPolicyOverride) { + if (this.loadBalancingPolicyOverride === 'RING_HASH') { + result.lb_policy = 'RING_HASH'; + } else if (this.loadBalancingPolicyOverride) { result.load_balancing_policy = { policies: [ { @@ -257,8 +259,14 @@ function createRouteConfig(route: FakeRoute): Route { prefix: '' }, route: { - cluster: route.cluster.getName() - } + cluster: route.cluster.getName(), + // Default to consistent hash + hash_policy: [{ + filter_state: { + key: 'io.grpc.channel_id' + } + }] + }, }; } else { return { @@ -271,7 +279,13 @@ function createRouteConfig(route: FakeRoute): Route { name: clusterWeight.cluster.getName(), weight: {value: clusterWeight.weight} })) - } + }, + // Default to consistent hash + hash_policy: [{ + filter_state: { + key: 'io.grpc.channel_id' + } + }] } } } diff --git a/packages/grpc-js-xds/test/test-confg-parsing.ts b/packages/grpc-js-xds/test/test-confg-parsing.ts index 740934ba..c185c852 100644 --- a/packages/grpc-js-xds/test/test-confg-parsing.ts +++ b/packages/grpc-js-xds/test/test-confg-parsing.ts @@ -19,6 +19,7 @@ import { experimental, LoadBalancingConfig } from "@grpc/grpc-js"; import { register } from "../src"; import assert = require("assert"); import parseLoadbalancingConfig = experimental.parseLoadBalancingConfig; +import { EXPERIMENTAL_RING_HASH } from "../src/environment"; register(); @@ -34,6 +35,7 @@ interface TestCase { input: object, output?: object; error?: RegExp; + skipIf?: boolean; } /* The main purpose of these tests is to verify that configs that are expected @@ -311,6 +313,41 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { } } } + ], + ring_hash: [ + { + name: 'empty config', + input: {}, + output: { + min_ring_size: 1024, + max_ring_size: 4096 + }, + skipIf: !EXPERIMENTAL_RING_HASH + }, + { + name: 'populated config', + input: { + min_ring_size: 2048, + max_ring_size: 8192 + }, + skipIf: !EXPERIMENTAL_RING_HASH + }, + { + name: 'min_ring_size too large', + input: { + min_ring_size: 8_388_609 + }, + error: /min_ring_size/, + skipIf: !EXPERIMENTAL_RING_HASH + }, + { + name: 'max_ring_size too large', + input: { + max_ring_size: 8_388_609 + }, + error: /max_ring_size/, + skipIf: !EXPERIMENTAL_RING_HASH + } ] } @@ -318,7 +355,10 @@ describe('Load balancing policy config parsing', () => { for (const [lbPolicyName, testCases] of Object.entries(allTestCases)) { describe(lbPolicyName, () => { for (const testCase of testCases) { - it(testCase.name, () => { + it(testCase.name, function() { + if (testCase.skipIf) { + this.skip(); + } const lbConfigInput = {[lbPolicyName]: testCase.input}; if (testCase.error) { assert.throws(() => { diff --git a/packages/grpc-js-xds/test/test-ring-hash.ts b/packages/grpc-js-xds/test/test-ring-hash.ts new file mode 100644 index 00000000..20d9eeed --- /dev/null +++ b/packages/grpc-js-xds/test/test-ring-hash.ts @@ -0,0 +1,173 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Backend } from "./backend"; +import { XdsTestClient } from "./client"; +import { FakeEdsCluster, FakeRouteGroup } from "./framework"; +import { XdsServer } from "./xds-server"; + +import { register } from "../src"; +import assert = require("assert"); +import { Any } from "../src/generated/google/protobuf/Any"; +import { AnyExtension } from "@grpc/proto-loader"; +import { RingHash } from "../src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash"; +import { EXPERIMENTAL_RING_HASH } from "../src/environment"; + +register(); + +describe('Ring hash LB policy', () => { + let xdsServer: XdsServer; + let client: XdsTestClient; + beforeEach(done => { + xdsServer = new XdsServer(); + xdsServer.startServer(error => { + done(error); + }); + }); + afterEach(() => { + client?.close(); + xdsServer?.shutdownServer(); + }); + it('Should route requests to the single backend with the old lbPolicy field', function(done) { + if (!EXPERIMENTAL_RING_HASH) { + this.skip(); + } + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], 'RING_HASH'); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendOneCall(done); + }, reason => done(reason)); + }); + it('Should route requests to the single backend with the new load_balancing_policy field', function(done) { + if (!EXPERIMENTAL_RING_HASH) { + this.skip(); + } + const lbPolicy: AnyExtension & RingHash = { + '@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash', + hash_function: 'XX_HASH' + }; + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendOneCall(done); + }, reason => done(reason)); + }); + it('Should route all identical requests to the same backend', function(done) { + if (!EXPERIMENTAL_RING_HASH) { + this.skip(); + } + const backend1 = new Backend(); + const backend2 = new Backend() + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], 'RING_HASH'); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendNCalls(10, error => { + assert.ifError(error); + assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0)); + done(); + }) + }, reason => done(reason)); + }); + it('Should fallback to a second backend if the first one goes down', function(done) { + if (!EXPERIMENTAL_RING_HASH) { + this.skip(); + } + const backends = [new Backend(), new Backend(), new Backend()]; + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: backends, locality:{region: 'region1'}}], 'RING_HASH'); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer); + client.sendNCalls(100, error => { + assert.ifError(error); + let backendWithTraffic: number | null = null; + for (let i = 0; i < backends.length; i++) { + if (backendWithTraffic === null) { + if (backends[i].getCallCount() > 0) { + backendWithTraffic = i; + } + } else { + assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic} and ${i} both got traffic`); + } + } + assert.notStrictEqual(backendWithTraffic, null, 'No backend got traffic'); + backends[backendWithTraffic!].shutdown(error => { + assert.ifError(error); + backends[backendWithTraffic!].resetCallCount(); + client.sendNCalls(100, error => { + assert.ifError(error); + let backendWithTraffic2: number | null = null; + for (let i = 0; i < backends.length; i++) { + if (backendWithTraffic2 === null) { + if (backends[i].getCallCount() > 0) { + backendWithTraffic2 = i; + } + } else { + assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic2} and ${i} both got traffic`); + } + } + assert.notStrictEqual(backendWithTraffic2, null, 'No backend got traffic'); + assert.notStrictEqual(backendWithTraffic2, backendWithTraffic, `Traffic went to the same backend ${backendWithTraffic} after shutdown`); + done(); + }); + }); + }); + }, reason => done(reason)); + }) +}); diff --git a/packages/grpc-js-xds/test/xds-server.ts b/packages/grpc-js-xds/test/xds-server.ts index 6f021c17..2aa62bdd 100644 --- a/packages/grpc-js-xds/test/xds-server.ts +++ b/packages/grpc-js-xds/test/xds-server.ts @@ -44,6 +44,7 @@ const loadedProtos = loadPackageDefinition(loadSync( 'envoy/extensions/clusters/aggregate/v3/cluster.proto', 'envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto', 'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto', + 'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto', 'xds/type/v3/typed_struct.proto' ], { diff --git a/packages/grpc-js-xds/tsconfig.json b/packages/grpc-js-xds/tsconfig.json index c121a5f6..24212dfc 100644 --- a/packages/grpc-js-xds/tsconfig.json +++ b/packages/grpc-js-xds/tsconfig.json @@ -3,8 +3,8 @@ "compilerOptions": { "rootDir": ".", "outDir": "build", - "target": "es2017", - "lib": ["es2017"], + "target": "es2020", + "lib": ["es2020"], "module": "commonjs", "incremental": true }, diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index f2bb8bcf..aa1e6c83 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -61,6 +61,7 @@ export interface ChannelOptions { * Set the enableTrace option in TLS clients and servers */ 'grpc-node.tls_enable_trace'?: number; + 'grpc.lb.ring_hash.ring_size_cap'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; } @@ -96,6 +97,7 @@ export const recognizedOptions = { 'grpc.service_config_disable_resolution': true, 'grpc.client_idle_timeout_ms': true, 'grpc-node.tls_enable_trace': true, + 'grpc.lb.ring_hash.ring_size_cap': true, }; export function channelOptionsEqual( diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 870fbe28..1e7a1e14 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -26,6 +26,7 @@ export { Endpoint, endpointToString, endpointHasAddress, + EndpointMap, } from './subchannel-address'; export { ChildLoadBalancerHandler } from './load-balancer-child-handler'; export { diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 2817201e..10d865ce 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -193,6 +193,15 @@ export class InternalChannel { private readonly callTracker = new ChannelzCallTracker(); private readonly childrenTracker = new ChannelzChildrenTracker(); + /** + * Randomly generated ID to be passed to the config selector, for use by + * ring_hash in xDS. An integer distributed approximately uniformly between + * 0 and MAX_SAFE_INTEGER. + */ + private readonly randomChannelId = Math.floor( + Math.random() * Number.MAX_SAFE_INTEGER + ); + constructor( target: string, private readonly credentials: ChannelCredentials, @@ -528,7 +537,7 @@ export class InternalChannel { if (this.configSelector) { return { type: 'SUCCESS', - config: this.configSelector(method, metadata), + config: this.configSelector(method, metadata, this.randomChannelId), }; } else { if (this.currentResolutionError) { diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 8e051361..8f2097f4 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -33,10 +33,9 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { PickArgs, Picker, PickResult, PickResultType } from './picker'; import { Endpoint, + EndpointMap, SubchannelAddress, - endpointHasAddress, endpointToString, - subchannelAddressEqual, } from './subchannel-address'; import { BaseSubchannelWrapper, @@ -461,126 +460,9 @@ interface MapEntry { subchannelWrappers: OutlierDetectionSubchannelWrapper[]; } -interface EndpointMapEntry { - key: Endpoint; - value: MapEntry; -} - -function endpointEqualUnordered( - endpoint1: Endpoint, - endpoint2: Endpoint -): boolean { - if (endpoint1.addresses.length !== endpoint2.addresses.length) { - return false; - } - for (const address1 of endpoint1.addresses) { - let matchFound = false; - for (const address2 of endpoint2.addresses) { - if (subchannelAddressEqual(address1, address2)) { - matchFound = true; - break; - } - } - if (!matchFound) { - return false; - } - } - return true; -} - -class EndpointMap { - private map: Set = new Set(); - - get size() { - return this.map.size; - } - - getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined { - for (const entry of this.map) { - if (endpointHasAddress(entry.key, address)) { - return entry.value; - } - } - return undefined; - } - - /** - * Delete any entries in this map with keys that are not in endpoints - * @param endpoints - */ - deleteMissing(endpoints: Endpoint[]) { - for (const entry of this.map) { - let foundEntry = false; - for (const endpoint of endpoints) { - if (endpointEqualUnordered(endpoint, entry.key)) { - foundEntry = true; - } - } - if (!foundEntry) { - this.map.delete(entry); - } - } - } - - get(endpoint: Endpoint): MapEntry | undefined { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - return entry.value; - } - } - return undefined; - } - - set(endpoint: Endpoint, mapEntry: MapEntry) { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - entry.value = mapEntry; - return; - } - } - this.map.add({ key: endpoint, value: mapEntry }); - } - - delete(endpoint: Endpoint) { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - this.map.delete(entry); - return; - } - } - } - - has(endpoint: Endpoint): boolean { - for (const entry of this.map) { - if (endpointEqualUnordered(endpoint, entry.key)) { - return true; - } - } - return false; - } - - *keys(): IterableIterator { - for (const entry of this.map) { - yield entry.key; - } - } - - *values(): IterableIterator { - for (const entry of this.map) { - yield entry.value; - } - } - - *entries(): IterableIterator<[Endpoint, MapEntry]> { - for (const entry of this.map) { - yield [entry.key, entry.value]; - } - } -} - export class OutlierDetectionLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private entryMap = new EndpointMap(); + private entryMap = new EndpointMap(); private latestConfig: OutlierDetectionLoadBalancingConfig | null = null; private ejectionTimer: NodeJS.Timeout; private timerStartTime: Date | null = null; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 9af66266..5cbd11cf 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -541,6 +541,19 @@ export class LeafLoadBalancer { this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG); } + /** + * Update the endpoint associated with this LeafLoadBalancer to a new + * endpoint. Does not trigger connection establishment if a connection + * attempt is not already in progress. + * @param newEndpoint + */ + updateEndpoint(newEndpoint: Endpoint) { + this.endpoint = newEndpoint; + if (this.latestState !== ConnectivityState.IDLE) { + this.startConnecting(); + } + } + getConnectivityState() { return this.latestState; } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 6474269f..ac79c9fe 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -122,25 +122,34 @@ export class UnavailablePicker implements Picker { * indicating that the pick should be tried again with the next `Picker`. Also * reports back to the load balancer that a connection should be established * once any pick is attempted. + * If the childPicker is provided, delegate to it instead of returning the + * hardcoded QUEUE pick result, but still calls exitIdle. */ export class QueuePicker { private calledExitIdle = false; // Constructed with a load balancer. Calls exitIdle on it the first time pick is called - constructor(private loadBalancer: LoadBalancer) {} + constructor( + private loadBalancer: LoadBalancer, + private childPicker?: Picker + ) {} - pick(pickArgs: PickArgs): QueuePickResult { + pick(pickArgs: PickArgs): PickResult { if (!this.calledExitIdle) { process.nextTick(() => { this.loadBalancer.exitIdle(); }); this.calledExitIdle = true; } - return { - pickResultType: PickResultType.QUEUE, - subchannel: null, - status: null, - onCallStarted: null, - onCallEnded: null, - }; + if (this.childPicker) { + return this.childPicker.pick(pickArgs); + } else { + return { + pickResultType: PickResultType.QUEUE, + subchannel: null, + status: null, + onCallStarted: null, + onCallEnded: null, + }; + } } } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 4dfa8d13..1c84c049 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -37,7 +37,7 @@ export interface CallConfig { * https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc */ export interface ConfigSelector { - (methodName: string, metadata: Metadata): CallConfig; + (methodName: string, metadata: Metadata, channelId: number): CallConfig; } /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index df480400..e600047e 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -279,7 +279,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { ); // Ensure that this.exitIdle() is called by the picker if (connectivityState === ConnectivityState.IDLE) { - picker = new QueuePicker(this); + picker = new QueuePicker(this, picker); } this.currentState = connectivityState; this.channelControlHelper.updateState(connectivityState, picker); diff --git a/packages/grpc-js/src/subchannel-address.ts b/packages/grpc-js/src/subchannel-address.ts index 36fd99ea..70a7962f 100644 --- a/packages/grpc-js/src/subchannel-address.ts +++ b/packages/grpc-js/src/subchannel-address.ts @@ -122,3 +122,127 @@ export function endpointHasAddress( } return false; } + +interface EndpointMapEntry { + key: Endpoint; + value: ValueType; +} + +function endpointEqualUnordered( + endpoint1: Endpoint, + endpoint2: Endpoint +): boolean { + if (endpoint1.addresses.length !== endpoint2.addresses.length) { + return false; + } + for (const address1 of endpoint1.addresses) { + let matchFound = false; + for (const address2 of endpoint2.addresses) { + if (subchannelAddressEqual(address1, address2)) { + matchFound = true; + break; + } + } + if (!matchFound) { + return false; + } + } + return true; +} + +export class EndpointMap { + private map: Set> = new Set(); + + get size() { + return this.map.size; + } + + getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined { + for (const entry of this.map) { + if (endpointHasAddress(entry.key, address)) { + return entry.value; + } + } + return undefined; + } + + /** + * Delete any entries in this map with keys that are not in endpoints + * @param endpoints + */ + deleteMissing(endpoints: Endpoint[]): ValueType[] { + const removedValues: ValueType[] = []; + for (const entry of this.map) { + let foundEntry = false; + for (const endpoint of endpoints) { + if (endpointEqualUnordered(endpoint, entry.key)) { + foundEntry = true; + } + } + if (!foundEntry) { + removedValues.push(entry.value); + this.map.delete(entry); + } + } + return removedValues; + } + + get(endpoint: Endpoint): ValueType | undefined { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return entry.value; + } + } + return undefined; + } + + set(endpoint: Endpoint, mapEntry: ValueType) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + entry.value = mapEntry; + return; + } + } + this.map.add({ key: endpoint, value: mapEntry }); + } + + delete(endpoint: Endpoint) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + this.map.delete(entry); + return; + } + } + } + + has(endpoint: Endpoint): boolean { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return true; + } + } + return false; + } + + clear() { + this.map.clear(); + } + + *keys(): IterableIterator { + for (const entry of this.map) { + yield entry.key; + } + } + + *values(): IterableIterator { + for (const entry of this.map) { + yield entry.value; + } + } + + *entries(): IterableIterator<[Endpoint, ValueType]> { + for (const entry of this.map) { + yield [entry.key, entry.value]; + } + } +}