diff --git a/packages/grpc-js/gulpfile.ts b/packages/grpc-js/gulpfile.ts index 6d8d2094..d8590036 100644 --- a/packages/grpc-js/gulpfile.ts +++ b/packages/grpc-js/gulpfile.ts @@ -67,6 +67,7 @@ const compile = checkTask(() => execNpmCommand('compile')); const copyTestFixtures = checkTask(() => ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`)); const runTests = checkTask(() => { + process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION = 'true'; return gulp.src(`${outDir}/test/**/*.js`) .pipe(mocha({reporter: 'mocha-jenkins-reporter', require: ['ts-node/register']})); diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 8fa2c592..8b9275e9 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -49,6 +49,7 @@ import { Filter } from './filter'; import { ConnectivityState } from './connectivity-state'; import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; +import { Subchannel } from './subchannel'; /** * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args @@ -451,7 +452,7 @@ export class ChannelImplementation implements Channel { if (subchannelState === ConnectivityState.READY) { try { const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); - pickResult.subchannel!.startCallStream( + pickResult.subchannel?.getRealSubchannel().startCallStream( finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters] diff --git a/packages/grpc-js/src/duration.ts b/packages/grpc-js/src/duration.ts new file mode 100644 index 00000000..278c9ae5 --- /dev/null +++ b/packages/grpc-js/src/duration.ts @@ -0,0 +1,36 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +export interface Duration { + seconds: number; + nanos: number; +} + +export function msToDuration(millis: number): Duration { + return { + seconds: (millis / 1000) | 0, + nanos: (millis % 1000) * 1_000_000 | 0 + }; +} + +export function durationToMs(duration: Duration): number { + return (duration.seconds * 1000 + duration.nanos / 1_000_000) | 0; +} + +export function isDuration(value: any): value is Duration { + return (typeof value.seconds === 'number') && (typeof value.nanos === 'number'); +} \ No newline at end of file diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 5353f7f5..a9d76406 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -6,7 +6,8 @@ export { ConfigSelector, } from './resolver'; export { GrpcUri, uriToString } from './uri-parser'; -export { ServiceConfig, Duration } from './service-config'; +export { Duration } from './duration'; +export { ServiceConfig } from './service-config'; export { BackoffTimeout } from './backoff-timeout'; export { LoadBalancer, @@ -34,3 +35,4 @@ export { Call as CallStream } from './call-stream'; export { Filter, BaseFilter, FilterFactory } from './filter'; export { FilterStackFactory } from './filter-stack'; export { registerAdminService } from './admin'; +export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener } from './subchannel-interface' diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index fa782aaa..ed452547 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -273,6 +273,7 @@ import * as resolver_uds from './resolver-uds'; import * as resolver_ip from './resolver-ip'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; +import * as load_balancer_outlier_detection from './load-balancer-outlier-detection'; import * as channelz from './channelz'; const clientVersion = require('../../package.json').version; @@ -284,5 +285,6 @@ const clientVersion = require('../../package.json').version; resolver_ip.setup(); load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); + load_balancer_outlier_detection.setup(); channelz.setup(); })(); diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index c5391494..50708cf9 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -21,12 +21,12 @@ import { LoadBalancingConfig, createLoadBalancer, } from './load-balancer'; -import { Subchannel } from './subchannel'; import { SubchannelAddress } from './subchannel-address'; import { ChannelOptions } from './channel-options'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; import { ChannelRef, SubchannelRef } from './channelz'; +import { SubchannelInterface } from './subchannel-interface'; const TYPE_NAME = 'child_load_balancer_helper'; @@ -40,7 +40,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { createSubchannel( subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions - ): Subchannel { + ): SubchannelInterface { return this.parent.channelControlHelper.createSubchannel( subchannelAddress, subchannelArgs diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts new file mode 100644 index 00000000..ffca1c68 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -0,0 +1,569 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { ChannelOptions, connectivityState, StatusObject } from "."; +import { Call } from "./call-stream"; +import { ConnectivityState } from "./connectivity-state"; +import { Status } from "./constants"; +import { durationToMs, isDuration, msToDuration } from "./duration"; +import { ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType } from "./experimental"; +import { BaseFilter, Filter, FilterFactory } from "./filter"; +import { getFirstUsableConfig, LoadBalancer, LoadBalancingConfig, validateLoadBalancingConfig } from "./load-balancer"; +import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; +import { PickArgs, Picker, PickResult, PickResultType, QueuePicker, UnavailablePicker } from "./picker"; +import { Subchannel } from "./subchannel"; +import { SubchannelAddress, subchannelAddressToString } from "./subchannel-address"; +import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from "./subchannel-interface"; + + +const TYPE_NAME = 'outlier_detection'; + +const OUTLIER_DETECTION_ENABLED = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION === 'true'; + +interface SuccessRateEjectionConfig { + readonly stdev_factor: number; + readonly enforcement_percentage: number; + readonly minimum_hosts: number; + readonly request_volume: number; +} + +interface FailurePercentageEjectionConfig { + readonly threshold: number; + readonly enforcement_percentage: number; + readonly minimum_hosts: number; + readonly request_volume: number; +} + +const defaultSuccessRateEjectionConfig: SuccessRateEjectionConfig = { + stdev_factor: 1900, + enforcement_percentage: 100, + minimum_hosts: 5, + request_volume: 100 +}; + +const defaultFailurePercentageEjectionConfig: FailurePercentageEjectionConfig = { + threshold: 85, + enforcement_percentage: 100, + minimum_hosts: 5, + request_volume: 50 +} + +type TypeofValues = 'object' | 'boolean' | 'function' | 'number' | 'string' | 'undefined'; + +function validateFieldType(obj: any, fieldName: string, expectedType: TypeofValues, objectName?: string) { + if (fieldName in obj && typeof obj[fieldName] !== expectedType) { + const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName; + throw new Error(`outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[fieldName]}`); + } +} + +function validatePositiveDuration(obj: any, fieldName: string, objectName?: string) { + const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName; + if (fieldName in obj) { + if (!isDuration(obj[fieldName])) { + throw new Error(`outlier detection config ${fullFieldName} parse error: expected Duration, got ${typeof obj[fieldName]}`); + } + if (!(obj[fieldName].seconds >= 0 && obj[fieldName].seconds <= 315_576_000_000 && obj[fieldName].nanos >= 0 && obj[fieldName].nanos <= 999_999_999)) { + throw new Error(`outlier detection config ${fullFieldName} parse error: values out of range for non-negative Duaration`); + } + } +} + +function validatePercentage(obj: any, fieldName: string, objectName?: string) { + const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName; + validateFieldType(obj, fieldName, 'number', objectName); + if (fieldName in obj && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) { + throw new Error(`outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`); + } +} + +export class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig { + constructor( + private readonly intervalMs: number, + private readonly baseEjectionTimeMs: number, + private readonly maxEjectionTimeMs: number, + private readonly maxEjectionPercent: number, + private readonly successRateEjection: SuccessRateEjectionConfig | null, + private readonly failurePercentageEjection: FailurePercentageEjectionConfig | null, + private readonly childPolicy: LoadBalancingConfig[] + ) {} + getLoadBalancerName(): string { + return TYPE_NAME; + } + toJsonObject(): object { + return { + interval: msToDuration(this.intervalMs), + base_ejection_time: msToDuration(this.baseEjectionTimeMs), + max_ejection_time: msToDuration(this.maxEjectionTimeMs), + max_ejection_percent: this.maxEjectionPercent, + success_rate_ejection: this.successRateEjection, + failure_percentage_ejection: this.failurePercentageEjection, + child_policy: this.childPolicy.map(policy => policy.toJsonObject()) + }; + } + + getIntervalMs(): number { + return this.intervalMs; + } + getBaseEjectionTimeMs(): number { + return this.baseEjectionTimeMs; + } + getMaxEjectionTimeMs(): number { + return this.maxEjectionTimeMs; + } + getMaxEjectionPercent(): number { + return this.maxEjectionPercent; + } + getSuccessRateEjectionConfig(): SuccessRateEjectionConfig | null { + return this.successRateEjection; + } + getFailurePercentageEjectionConfig(): FailurePercentageEjectionConfig | null { + return this.failurePercentageEjection; + } + getChildPolicy(): LoadBalancingConfig[] { + return this.childPolicy; + } + static createFromJson(obj: any): OutlierDetectionLoadBalancingConfig { + validatePositiveDuration(obj, 'interval'); + validatePositiveDuration(obj, 'base_ejection_time'); + validatePositiveDuration(obj, 'max_ejection_time'); + validatePercentage(obj, 'max_ejection_percent'); + if ('success_rate_ejection' in obj) { + if (typeof obj.success_rate_ejection !== 'object') { + throw new Error('outlier detection config success_rate_ejection must be an object'); + } + validateFieldType(obj.success_rate_ejection, 'stdev_factor', 'number', 'success_rate_ejection'); + validatePercentage(obj.success_rate_ejection, 'enforcement_percentage', 'success_rate_ejection'); + validateFieldType(obj.success_rate_ejection, 'minimum_hosts', 'number', 'success_rate_ejection'); + validateFieldType(obj.success_rate_ejection, 'request_volume', 'number', 'success_rate_ejection'); + } + if ('failure_percentage_ejection' in obj) { + if (typeof obj.failure_percentage_ejection !== 'object') { + throw new Error('outlier detection config failure_percentage_ejection must be an object'); + } + validatePercentage(obj.failure_percentage_ejection, 'threshold', 'failure_percentage_ejection'); + validatePercentage(obj.failure_percentage_ejection, 'enforcement_percentage', 'failure_percentage_ejection'); + validateFieldType(obj.failure_percentage_ejection, 'minimum_hosts', 'number', 'failure_percentage_ejection'); + validateFieldType(obj.failure_percentage_ejection, 'request_volume', 'number', 'failure_percentage_ejection'); + } + + return new OutlierDetectionLoadBalancingConfig( + obj.interval ? durationToMs(obj.interval) : 10_000, + obj.base_ejection_time ? durationToMs(obj.base_ejection_time) : 30_000, + obj.max_ejection_time ? durationToMs(obj.max_ejection_time) : 300_000, + obj.max_ejection_percent ?? 10, + obj.success_rate_ejection ? {...defaultSuccessRateEjectionConfig, ...obj.success_rate_ejection} : null, + obj.failure_percentage_ejection ? {...defaultFailurePercentageEjectionConfig, ...obj.failure_percentage_ejection} : null, + obj.child_policy.map(validateLoadBalancingConfig) + ); + } +} + +class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface { + private childSubchannelState: ConnectivityState = ConnectivityState.IDLE; + private stateListeners: ConnectivityStateListener[] = []; + private ejected: boolean = false; + private refCount: number = 0; + constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) { + super(childSubchannel); + childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => { + this.childSubchannelState = newState; + if (!this.ejected) { + for (const listener of this.stateListeners) { + listener(this, previousState, newState); + } + } + }); + } + + /** + * Add a listener function to be called whenever the wrapper's + * connectivity state changes. + * @param listener + */ + addConnectivityStateListener(listener: ConnectivityStateListener) { + this.stateListeners.push(listener); + } + + /** + * Remove a listener previously added with `addConnectivityStateListener` + * @param listener A reference to a function previously passed to + * `addConnectivityStateListener` + */ + removeConnectivityStateListener(listener: ConnectivityStateListener) { + const listenerIndex = this.stateListeners.indexOf(listener); + if (listenerIndex > -1) { + this.stateListeners.splice(listenerIndex, 1); + } + } + + ref() { + this.child.ref(); + this.refCount += 1; + } + + unref() { + this.child.unref(); + this.refCount -= 1; + if (this.refCount <= 0) { + if (this.mapEntry) { + const index = this.mapEntry.subchannelWrappers.indexOf(this); + if (index >= 0) { + this.mapEntry.subchannelWrappers.splice(index, 1); + } + } + } + } + + eject() { + this.ejected = true; + for (const listener of this.stateListeners) { + listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE); + } + } + + uneject() { + this.ejected = false; + for (const listener of this.stateListeners) { + listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState); + } + } + + getMapEntry(): MapEntry | undefined { + return this.mapEntry; + } + + getWrappedSubchannel(): SubchannelInterface { + return this.child; + } +} + +interface CallCountBucket { + success: number; + failure: number; +} + +function createEmptyBucket(): CallCountBucket { + return { + success: 0, + failure: 0 + } +} + +class CallCounter { + private activeBucket: CallCountBucket = createEmptyBucket(); + private inactiveBucket: CallCountBucket = createEmptyBucket(); + addSuccess() { + this.activeBucket.success += 1; + } + addFailure() { + this.activeBucket.failure += 1; + } + switchBuckets() { + this.inactiveBucket = this.activeBucket; + this.activeBucket = createEmptyBucket(); + } + getLastSuccesses() { + return this.inactiveBucket.success; + } + getLastFailures() { + return this.inactiveBucket.failure; + } +} + +interface MapEntry { + counter: CallCounter; + currentEjectionTimestamp: Date | null; + ejectionTimeMultiplier: number; + subchannelWrappers: OutlierDetectionSubchannelWrapper[]; +} + +class OutlierDetectionCounterFilter extends BaseFilter implements Filter { + constructor(private callCounter: CallCounter) { + super(); + } + receiveTrailers(status: StatusObject): StatusObject { + if (status.code === Status.OK) { + this.callCounter.addSuccess(); + } else { + this.callCounter.addFailure(); + } + return status; + } +} + +class OutlierDetectionCounterFilterFactory implements FilterFactory { + constructor(private callCounter: CallCounter) {} + createFilter(callStream: Call): OutlierDetectionCounterFilter { + return new OutlierDetectionCounterFilter(this.callCounter); + } + +} + +class OutlierDetectionPicker implements Picker { + constructor(private wrappedPicker: Picker) {} + pick(pickArgs: PickArgs): PickResult { + const wrappedPick = this.wrappedPicker.pick(pickArgs); + if (wrappedPick.pickResultType === PickResultType.COMPLETE) { + const subchannelWrapper = wrappedPick.subchannel as OutlierDetectionSubchannelWrapper; + const mapEntry = subchannelWrapper.getMapEntry(); + if (mapEntry) { + return { + ...wrappedPick, + subchannel: subchannelWrapper.getWrappedSubchannel(), + extraFilterFactories: [...wrappedPick.extraFilterFactories, new OutlierDetectionCounterFilterFactory(mapEntry.counter)] + }; + } else { + return wrappedPick; + } + } else { + return wrappedPick; + } + } + +} + +export class OutlierDetectionLoadBalancer implements LoadBalancer { + private childBalancer: ChildLoadBalancerHandler; + private addressMap: Map = new Map(); + private latestConfig: OutlierDetectionLoadBalancingConfig | null = null; + private ejectionTimer: NodeJS.Timer; + + constructor(channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { + createSubchannel: (subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions) => { + const originalSubchannel = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + const mapEntry = this.addressMap.get(subchannelAddressToString(subchannelAddress)); + const subchannelWrapper = new OutlierDetectionSubchannelWrapper(originalSubchannel, mapEntry); + mapEntry?.subchannelWrappers.push(subchannelWrapper); + return subchannelWrapper; + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + if (connectivityState === ConnectivityState.READY) { + channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker)); + } else { + channelControlHelper.updateState(connectivityState, picker); + } + } + })); + this.ejectionTimer = setInterval(() => {}, 0); + clearInterval(this.ejectionTimer); + } + + private getCurrentEjectionPercent() { + let ejectionCount = 0; + for (const mapEntry of this.addressMap.values()) { + if (mapEntry.currentEjectionTimestamp !== null) { + ejectionCount += 1; + } + } + return (ejectionCount * 100) / this.addressMap.size; + } + + private runSuccessRateCheck(ejectionTimestamp: Date) { + if (!this.latestConfig) { + return; + } + const successRateConfig = this.latestConfig.getSuccessRateEjectionConfig(); + if (!successRateConfig) { + return; + } + // Step 1 + const targetRequestVolume = successRateConfig.request_volume; + let addresesWithTargetVolume = 0; + const successRates: number[] = [] + for (const mapEntry of this.addressMap.values()) { + const successes = mapEntry.counter.getLastSuccesses(); + const failures = mapEntry.counter.getLastFailures(); + if (successes + failures >= targetRequestVolume) { + addresesWithTargetVolume += 1; + successRates.push(successes/(successes + failures)); + } + } + if (addresesWithTargetVolume < successRateConfig.minimum_hosts) { + return; + } + + // Step 2 + const successRateMean = successRates.reduce((a, b) => a + b); + let successRateVariance = 0; + for (const rate of successRates) { + const deviation = rate - successRateMean; + successRateVariance += deviation * deviation; + } + const successRateStdev = Math.sqrt(successRateVariance); + const ejectionThreshold = successRateMean - successRateStdev * (successRateConfig.stdev_factor / 1000); + + // Step 3 + for (const mapEntry of this.addressMap.values()) { + // Step 3.i + if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) { + break; + } + // Step 3.ii + const successes = mapEntry.counter.getLastSuccesses(); + const failures = mapEntry.counter.getLastFailures(); + if (successes + failures < targetRequestVolume) { + continue; + } + // Step 3.iii + const successRate = successes / (successes + failures); + if (successRate < ejectionThreshold) { + const randomNumber = Math.random() * 100; + if (randomNumber < successRateConfig.enforcement_percentage) { + this.eject(mapEntry, ejectionTimestamp); + } + } + } + } + + private runFailurePercentageCheck(ejectionTimestamp: Date) { + if (!this.latestConfig) { + return; + } + const failurePercentageConfig = this.latestConfig.getFailurePercentageEjectionConfig() + if (!failurePercentageConfig) { + return; + } + // Step 1 + if (this.addressMap.size < failurePercentageConfig.minimum_hosts) { + return; + } + + // Step 2 + for (const mapEntry of this.addressMap.values()) { + // Step 2.i + if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) { + break; + } + // Step 2.ii + const successes = mapEntry.counter.getLastSuccesses(); + const failures = mapEntry.counter.getLastFailures(); + if (successes + failures < failurePercentageConfig.request_volume) { + continue; + } + // Step 2.iii + const failurePercentage = (failures * 100) / (failures + successes); + if (failurePercentage > failurePercentageConfig.threshold) { + const randomNumber = Math.random() * 100; + if (randomNumber < failurePercentageConfig.enforcement_percentage) { + this.eject(mapEntry, ejectionTimestamp); + } + } + } + } + + private eject(mapEntry: MapEntry, ejectionTimestamp: Date) { + mapEntry.currentEjectionTimestamp = new Date(); + mapEntry.ejectionTimeMultiplier += 1; + for (const subchannelWrapper of mapEntry.subchannelWrappers) { + subchannelWrapper.eject(); + } + } + + private uneject(mapEntry: MapEntry) { + mapEntry.currentEjectionTimestamp = null; + for (const subchannelWrapper of mapEntry.subchannelWrappers) { + subchannelWrapper.uneject(); + } + } + + private runChecks() { + const ejectionTimestamp = new Date(); + + for (const mapEntry of this.addressMap.values()) { + mapEntry.counter.switchBuckets(); + } + + if (!this.latestConfig) { + return; + } + + this.runSuccessRateCheck(ejectionTimestamp); + this.runFailurePercentageCheck(ejectionTimestamp); + + for (const mapEntry of this.addressMap.values()) { + if (mapEntry.currentEjectionTimestamp === null) { + if (mapEntry.ejectionTimeMultiplier > 0) { + mapEntry.ejectionTimeMultiplier -= 1; + } + } else { + const baseEjectionTimeMs = this.latestConfig.getBaseEjectionTimeMs(); + const maxEjectionTimeMs = this.latestConfig.getMaxEjectionTimeMs(); + const returnTime = new Date(mapEntry.currentEjectionTimestamp.getTime()); + returnTime.setMilliseconds(returnTime.getMilliseconds() + Math.min(baseEjectionTimeMs * mapEntry.ejectionTimeMultiplier, Math.max(baseEjectionTimeMs, maxEjectionTimeMs))); + if (returnTime < new Date()) { + this.uneject(mapEntry); + } + } + } + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) { + return; + } + const subchannelAddresses = new Set(); + for (const address of addressList) { + subchannelAddresses.add(subchannelAddressToString(address)); + } + for (const address of subchannelAddresses) { + if (!this.addressMap.has(address)) { + this.addressMap.set(address, { + counter: new CallCounter(), + currentEjectionTimestamp: null, + ejectionTimeMultiplier: 0, + subchannelWrappers: [] + }); + } + } + for (const key of this.addressMap.keys()) { + if (!subchannelAddresses.has(key)) { + this.addressMap.delete(key); + } + } + const childPolicy: LoadBalancingConfig = getFirstUsableConfig( + lbConfig.getChildPolicy(), + true + ); + this.childBalancer.updateAddressList(addressList, childPolicy, attributes); + + if (this.latestConfig === null || this.latestConfig.getIntervalMs() !== lbConfig.getIntervalMs()) { + clearInterval(this.ejectionTimer); + this.ejectionTimer = setInterval(() => this.runChecks(), lbConfig.getIntervalMs()); + } + this.latestConfig = lbConfig; + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + if (OUTLIER_DETECTION_ENABLED) { + registerLoadBalancerType(TYPE_NAME, OutlierDetectionLoadBalancer, OutlierDetectionLoadBalancingConfig); + } +} \ No newline at end of file diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 6cdb7cb9..c7033f58 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -31,13 +31,13 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { Subchannel, ConnectivityStateListener } from './subchannel'; import { SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; +import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface'; const TRACER_NAME = 'pick_first'; @@ -77,7 +77,7 @@ export class PickFirstLoadBalancingConfig implements LoadBalancingConfig { * picked subchannel. */ class PickFirstPicker implements Picker { - constructor(private subchannel: Subchannel) {} + constructor(private subchannel: SubchannelInterface) {} pick(pickArgs: PickArgs): CompletePickResult { return { @@ -107,7 +107,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { * The list of subchannels this load balancer is currently attempting to * connect to. */ - private subchannels: Subchannel[] = []; + private subchannels: SubchannelInterface[] = []; /** * The current connectivity state of the load balancer. */ @@ -124,7 +124,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { * and only if the load balancer's current state is READY. In that case, * the subchannel's current state is also READY. */ - private currentPick: Subchannel | null = null; + private currentPick: SubchannelInterface | null = null; /** * Listener callback attached to each subchannel in the `subchannels` list * while establishing a connection. @@ -157,7 +157,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { [ConnectivityState.TRANSIENT_FAILURE]: 0, }; this.subchannelStateListener = ( - subchannel: Subchannel, + subchannel: SubchannelInterface, previousState: ConnectivityState, newState: ConnectivityState ) => { @@ -219,7 +219,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { } }; this.pickedSubchannelStateListener = ( - subchannel: Subchannel, + subchannel: SubchannelInterface, previousState: ConnectivityState, newState: ConnectivityState ) => { @@ -310,7 +310,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { }, CONNECTION_DELAY_INTERVAL_MS); } - private pickSubchannel(subchannel: Subchannel) { + private pickSubchannel(subchannel: SubchannelInterface) { trace('Pick subchannel with address ' + subchannel.getAddress()); if (this.currentPick !== null) { this.currentPick.unref(); diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 918afab2..8a4094a0 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -30,13 +30,13 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { Subchannel, ConnectivityStateListener } from './subchannel'; import { SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; +import { ConnectivityStateListener, SubchannelInterface } from './subchannel-interface'; const TRACER_NAME = 'round_robin'; @@ -67,7 +67,7 @@ class RoundRobinLoadBalancingConfig implements LoadBalancingConfig { class RoundRobinPicker implements Picker { constructor( - private readonly subchannelList: Subchannel[], + private readonly subchannelList: SubchannelInterface[], private nextIndex = 0 ) {} @@ -88,7 +88,7 @@ class RoundRobinPicker implements Picker { * balancer implementation to preserve this part of the picker state if * possible when a subchannel connects or disconnects. */ - peekNextSubchannel(): Subchannel { + peekNextSubchannel(): SubchannelInterface { return this.subchannelList[this.nextIndex]; } } @@ -102,7 +102,7 @@ interface ConnectivityStateCounts { } export class RoundRobinLoadBalancer implements LoadBalancer { - private subchannels: Subchannel[] = []; + private subchannels: SubchannelInterface[] = []; private currentState: ConnectivityState = ConnectivityState.IDLE; @@ -121,7 +121,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { [ConnectivityState.TRANSIENT_FAILURE]: 0, }; this.subchannelStateListener = ( - subchannel: Subchannel, + subchannel: SubchannelInterface, previousState: ConnectivityState, newState: ConnectivityState ) => { diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 19b79764..48930c7d 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -21,6 +21,7 @@ import { SubchannelAddress } from './subchannel-address'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; import { ChannelRef, SubchannelRef } from './channelz'; +import { SubchannelInterface } from './subchannel-interface'; /** * A collection of functions associated with a channel that a load balancer @@ -35,7 +36,7 @@ export interface ChannelControlHelper { createSubchannel( subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions - ): Subchannel; + ): SubchannelInterface; /** * Passes a new subchannel picker up to the channel. This is called if either * the connectivity state changes or if a different picker is needed for any diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 7aeed89b..f366a691 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -21,6 +21,7 @@ import { Metadata } from './metadata'; import { Status } from './constants'; import { LoadBalancer } from './load-balancer'; import { FilterFactory, Filter } from './filter'; +import { SubchannelInterface } from './subchannel-interface'; export enum PickResultType { COMPLETE, @@ -36,7 +37,7 @@ export interface PickResult { * `pickResultType` is COMPLETE. If null, indicates that the call should be * dropped. */ - subchannel: Subchannel | null; + subchannel: SubchannelInterface | null; /** * The status object to end the call with. Populated if and only if * `pickResultType` is TRANSIENT_FAILURE. @@ -53,7 +54,7 @@ export interface PickResult { export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; - subchannel: Subchannel | null; + subchannel: SubchannelInterface | null; status: null; extraFilterFactories: FilterFactory[]; onCallStarted: (() => void) | null; diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index 3f0a0086..f310597e 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -27,6 +27,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as os from 'os'; +import { Duration } from './duration'; import { LoadBalancingConfig, validateLoadBalancingConfig, @@ -37,11 +38,6 @@ export interface MethodConfigName { method?: string; } -export interface Duration { - seconds: number; - nanos: number; -} - export interface MethodConfig { name: MethodConfigName[]; waitForReady?: boolean; diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts new file mode 100644 index 00000000..082a8b3c --- /dev/null +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -0,0 +1,82 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { SubchannelRef } from "./channelz"; +import { ConnectivityState } from "./connectivity-state"; +import { Subchannel } from "./subchannel"; + +export type ConnectivityStateListener = ( + subchannel: SubchannelInterface, + previousState: ConnectivityState, + newState: ConnectivityState +) => void; + +/** + * This is an interface for load balancing policies to use to interact with + * subchannels. This allows load balancing policies to wrap and unwrap + * subchannels. + * + * Any load balancing policy that wraps subchannels must unwrap the subchannel + * in the picker, so that other load balancing policies consistently have + * access to their own wrapper objects. + */ +export interface SubchannelInterface { + getConnectivityState(): ConnectivityState; + addConnectivityStateListener(listener: ConnectivityStateListener): void; + removeConnectivityStateListener(listener: ConnectivityStateListener): void; + startConnecting(): void; + getAddress(): string; + ref(): void; + unref(): void; + getChannelzRef(): SubchannelRef; + /** + * If this is a wrapper, return the wrapped subchannel, otherwise return this + */ + getRealSubchannel(): Subchannel; +} + +export abstract class BaseSubchannelWrapper implements SubchannelInterface { + constructor(protected child: SubchannelInterface) {} + + getConnectivityState(): ConnectivityState { + return this.child.getConnectivityState(); + } + addConnectivityStateListener(listener: ConnectivityStateListener): void { + this.child.addConnectivityStateListener(listener); + } + removeConnectivityStateListener(listener: ConnectivityStateListener): void { + this.child.removeConnectivityStateListener(listener); + } + startConnecting(): void { + this.child.startConnecting(); + } + getAddress(): string { + return this.child.getAddress(); + } + ref(): void { + this.child.ref(); + } + unref(): void { + this.child.unref(); + } + getChannelzRef(): SubchannelRef { + return this.child.getChannelzRef(); + } + getRealSubchannel(): Subchannel { + return this.child.getRealSubchannel(); + } +} \ No newline at end of file diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 6f9471bb..3a1f7048 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -37,6 +37,7 @@ import { subchannelAddressToString, } from './subchannel-address'; import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; +import { ConnectivityStateListener } from './subchannel-interface'; const clientVersion = require('../../package.json').version; @@ -54,12 +55,6 @@ const BACKOFF_JITTER = 0.2; const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; -export type ConnectivityStateListener = ( - subchannel: Subchannel, - previousState: ConnectivityState, - newState: ConnectivityState -) => void; - export interface SubchannelCallStatsTracker { addMessageSent(): void; addMessageReceived(): void; @@ -949,4 +944,8 @@ export class Subchannel { getChannelzRef(): SubchannelRef { return this.channelzRef; } + + getRealSubchannel(): this { + return this; + } } diff --git a/packages/grpc-js/test/test-outlier-detection.ts b/packages/grpc-js/test/test-outlier-detection.ts new file mode 100644 index 00000000..977a3058 --- /dev/null +++ b/packages/grpc-js/test/test-outlier-detection.ts @@ -0,0 +1,121 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as path from 'path'; +import * as grpc from '../src'; +import { loadProtoFile } from './common'; + +function multiDone(done: Mocha.Done, target: number) { + let count = 0; + return (error?: any) => { + if (error) { + done(error); + } + count++; + if (count >= target) { + done(); + } + } +} + +const defaultOutlierDetectionServiceConfig = { + methodConfig: [], + loadBalancingConfig: [ + { + outlier_detection: { + success_rate_ejection: {}, + failure_percentage_ejection: {}, + child_policy: [{round_robin: {}}] + } + } + ] +}; + +const defaultOutlierDetectionServiceConfigString = JSON.stringify(defaultOutlierDetectionServiceConfig); + +const goodService = { + echo: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + callback(null, call.request) + } +}; + +const badService = { + echo: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + callback({ + code: grpc.status.PERMISSION_DENIED, + details: 'Permission denied' + }) + } +} + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const EchoService = loadProtoFile(protoFile) + .EchoService as grpc.ServiceClientConstructor; + +describe('Outlier detection', () => { + const GOOD_PORTS = 4; + let goodServer: grpc.Server; + let badServer: grpc.Server; + const goodPorts: number[] = []; + let badPort: number; + before(done => { + const eachDone = multiDone(() => { + goodServer.start(); + badServer.start(); + done(); + }, GOOD_PORTS + 1); + goodServer = new grpc.Server(); + goodServer.addService(EchoService.service, goodService); + for (let i = 0; i < GOOD_PORTS; i++) { + goodServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + eachDone(error); + return; + } + goodPorts.push(port); + eachDone(); + }); + } + badServer = new grpc.Server(); + badServer.addService(EchoService.service, badService); + badServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + eachDone(error); + return; + } + badPort = port; + eachDone(); + }); + }); + after(() => { + goodServer.forceShutdown(); + badServer.forceShutdown(); + }); + + it('Should allow normal operation with one server', done => { + const client = new EchoService(`localhost:${goodPorts[0]}`, grpc.credentials.createInsecure(), {'grpc.service_config': defaultOutlierDetectionServiceConfigString}); + client.echo( + { value: 'test value', value2: 3 }, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); +}); \ No newline at end of file