diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts index b8b518da..250f791a 100644 --- a/packages/grpc-js-xds/src/environment.ts +++ b/packages/grpc-js-xds/src/environment.ts @@ -16,4 +16,4 @@ */ export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true'; -export const EXPERIMENTAL_OUTLIER_DETECTION = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION === 'true'; \ No newline at end of file +export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true'; \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index 91cb6f30..9b7b9cd5 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -18,6 +18,7 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { isIPv4, isIPv6 } from "net"; import { Locality__Output } from "../generated/envoy/config/core/v3/Locality"; +import { SocketAddress__Output } from "../generated/envoy/config/core/v3/SocketAddress"; import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; import { Any__Output } from "../generated/google/protobuf/Any"; import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; @@ -32,6 +33,10 @@ function localitiesEqual(a: Locality__Output, b: Locality__Output) { return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone; } +function addressesEqual(a: SocketAddress__Output, b: SocketAddress__Output) { + return a.address === b.address && a.port_value === b.port_value; +} + export class EdsState extends BaseXdsStreamState implements XdsStreamState { protected getResourceName(resource: ClusterLoadAssignment__Output): string { return resource.cluster_name; @@ -50,6 +55,8 @@ export class EdsState extends BaseXdsStreamState */ public validateResponse(message: ClusterLoadAssignment__Output) { const seenLocalities: {locality: Locality__Output, priority: number}[] = []; + const seenAddresses: SocketAddress__Output[] = []; + const priorityTotalWeights: Map = new Map(); for (const endpoint of message.endpoints) { if (!endpoint.locality) { return false; @@ -71,6 +78,23 @@ export class EdsState extends BaseXdsStreamState if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { return false; } + for (const address of seenAddresses) { + if (addressesEqual(socketAddress, address)) { + return false; + } + } + seenAddresses.push(socketAddress); + } + priorityTotalWeights.set(endpoint.priority, (priorityTotalWeights.get(endpoint.priority) ?? 0) + (endpoint.load_balancing_weight?.value ?? 0)); + } + for (const totalWeight of priorityTotalWeights.values()) { + if (totalWeight >= 1<<32) { + return false; + } + } + for (const priority of priorityTotalWeights.keys()) { + if (priority > 0 && !priorityTotalWeights.has(priority - 1)) { + return false; } } return true; diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 77a84469..a5d3c47c 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -89,6 +89,9 @@ export class RdsState extends BaseXdsStreamState imp } } if (route.route!.cluster_specifier === 'weighted_clusters') { + if (route.route.weighted_clusters!.total_weight?.value === 0) { + return false; + } let weightSum = 0; for (const clusterWeight of route.route.weighted_clusters!.clusters) { weightSum += clusterWeight.weight?.value ?? 0; diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index d5e5e692..c0db93ae 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.7.0", + "version": "1.7.1", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 88bf3a7e..93b2204c 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -68,6 +68,28 @@ function getNewCallNumber(): number { return callNumber; } +const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [ + Status.OK, + Status.INVALID_ARGUMENT, + Status.NOT_FOUND, + Status.ALREADY_EXISTS, + Status.FAILED_PRECONDITION, + Status.ABORTED, + Status.OUT_OF_RANGE, + Status.DATA_LOSS +] + +function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} { + if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) { + return { + code: Status.INTERNAL, + details: `Invalid status from control plane: ${code} ${Status[code]} ${details}` + } + } else { + return {code, details}; + } +} + /** * An interface that represents a communication channel to a server specified * by a given address. @@ -320,7 +342,7 @@ export class ChannelImplementation implements Channel { this.trace('Name resolution failed with calls queued for config selection'); } if (this.configSelector === null) { - this.currentResolutionError = status; + this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata}; } const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; @@ -534,10 +556,11 @@ export class ChannelImplementation implements Channel { }, (error: Error & { code: number }) => { // We assume the error code isn't 0 (Status.OK) - callStream.cancelWithStatus( + const {code, details} = restrictControlPlaneStatusCode( typeof error.code === 'number' ? error.code : Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}` - ); + ) + callStream.cancelWithStatus(code, details); } ); } @@ -549,17 +572,13 @@ export class ChannelImplementation implements Channel { if (callMetadata.getOptions().waitForReady) { this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); } else { - callStream.cancelWithStatus( - pickResult.status!.code, - pickResult.status!.details - ); + const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); + callStream.cancelWithStatus(code, details); } break; case PickResultType.DROP: - callStream.cancelWithStatus( - pickResult.status!.code, - pickResult.status!.details - ); + const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); + callStream.cancelWithStatus(code, details); break; default: throw new Error( @@ -668,10 +687,8 @@ export class ChannelImplementation implements Channel { this.tryPick(stream, metadata, callConfig, []); } } else { - stream.cancelWithStatus( - callConfig.status, - 'Failed to route call to method ' + stream.getMethod() - ); + const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod()); + stream.cancelWithStatus(code, details); } } } diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 747c5c87..1198dc46 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -108,6 +108,10 @@ export type ClientOptions = Partial & { callInvocationTransformer?: CallInvocationTransformer; }; +function getErrorStackString(error: Error): string { + return error.stack!.split('\n').slice(1).join('\n'); +} + /** * A generic gRPC client. Primarily useful as a base class for all generated * clients. @@ -321,7 +325,7 @@ export class Client { } let responseMessage: ResponseType | null = null; let receivedStatus = false; - const callerStack = (new Error().stack!).split('\n').slice(1).join('\n'); + const callerStackError = new Error(); call.start(callProperties.metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -340,6 +344,7 @@ export class Client { receivedStatus = true; if (status.code === Status.OK) { if (responseMessage === null) { + const callerStack = getErrorStackString(callerStackError); callProperties.callback!(callErrorFromStatus({ code: Status.INTERNAL, details: 'No message received', @@ -349,6 +354,7 @@ export class Client { callProperties.callback!(null, responseMessage); } } else { + const callerStack = getErrorStackString(callerStackError); callProperties.callback!(callErrorFromStatus(status, callerStack)); } emitter.emit('status', status); @@ -447,7 +453,7 @@ export class Client { } let responseMessage: ResponseType | null = null; let receivedStatus = false; - const callerStack = (new Error().stack!).split('\n').slice(1).join('\n'); + const callerStackError = new Error(); call.start(callProperties.metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -466,6 +472,7 @@ export class Client { receivedStatus = true; if (status.code === Status.OK) { if (responseMessage === null) { + const callerStack = getErrorStackString(callerStackError); callProperties.callback!(callErrorFromStatus({ code: Status.INTERNAL, details: 'No message received', @@ -475,6 +482,7 @@ export class Client { callProperties.callback!(null, responseMessage); } } else { + const callerStack = getErrorStackString(callerStackError); callProperties.callback!(callErrorFromStatus(status, callerStack)); } emitter.emit('status', status); @@ -577,7 +585,7 @@ export class Client { call.setCredentials(callProperties.callOptions.credentials); } let receivedStatus = false; - const callerStack = (new Error().stack!).split('\n').slice(1).join('\n'); + const callerStackError = new Error(); call.start(callProperties.metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -593,6 +601,7 @@ export class Client { receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { + const callerStack = getErrorStackString(callerStackError); stream.emit('error', callErrorFromStatus(status, callerStack)); } stream.emit('status', status); @@ -675,7 +684,7 @@ export class Client { call.setCredentials(callProperties.callOptions.credentials); } let receivedStatus = false; - const callerStack = (new Error().stack!).split('\n').slice(1).join('\n'); + const callerStackError = new Error(); call.start(callProperties.metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -690,6 +699,7 @@ export class Client { receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { + const callerStack = getErrorStackString(callerStackError); stream.emit('error', callErrorFromStatus(status, callerStack)); } stream.emit('status', status); diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 685cfc60..52a9bfc9 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -38,7 +38,7 @@ function trace(text: string): void { const TYPE_NAME = 'outlier_detection'; -const OUTLIER_DETECTION_ENABLED = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION !== 'false'; +const OUTLIER_DETECTION_ENABLED = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true'; export interface SuccessRateEjectionConfig { readonly stdev_factor: number; diff --git a/packages/grpc-js/test/test-channel-credentials.ts b/packages/grpc-js/test/test-channel-credentials.ts index d6028f46..2b537ac9 100644 --- a/packages/grpc-js/test/test-channel-credentials.ts +++ b/packages/grpc-js/test/test-channel-credentials.ts @@ -17,12 +17,22 @@ import * as assert from 'assert'; import * as fs from 'fs'; +import * as path from 'path'; import { promisify } from 'util'; +import * as protoLoader from '@grpc/proto-loader'; import { CallCredentials } from '../src/call-credentials'; import { ChannelCredentials } from '../src/channel-credentials'; +import * as grpc from '../src'; +import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; +import { TestServiceClient, TestServiceHandlers } from './generated/TestService'; +import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service'; -import { assert2, mockFunction } from './common'; +import { assert2, loadProtoFile, mockFunction } from './common'; +import { sendUnaryData, ServerUnaryCall, ServiceError } from '../src'; + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const echoService = loadProtoFile(protoFile).EchoService as ServiceClientConstructor; class CallCredentialsMock implements CallCredentials { child: CallCredentialsMock | null = null; @@ -138,3 +148,65 @@ describe('ChannelCredentials Implementation', () => { }); }); }); + +describe('ChannelCredentials usage', () => { + let client: ServiceClient; + let server: grpc.Server; + before(async () => { + const {ca, key, cert} = await pFixtures; + const serverCreds = grpc.ServerCredentials.createSsl(null, [{private_key: key, cert_chain: cert}]); + const channelCreds = ChannelCredentials.createSsl(ca); + const callCreds = CallCredentials.createFromMetadataGenerator((options, cb) => { + const metadata = new grpc.Metadata(); + metadata.set('test-key', 'test-value'); + cb(null, metadata); + }); + const combinedCreds = channelCreds.compose(callCreds); + return new Promise((resolve, reject) => { + + server = new grpc.Server(); + server.addService(echoService.service, { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + call.sendMetadata(call.metadata); + callback(null, call.request); + }, + }); + + server.bindAsync( + 'localhost:0', + serverCreds, + (err, port) => { + if (err) { + reject(err); + return; + } + client = new echoService( + `localhost:${port}`, + combinedCreds, + {'grpc.ssl_target_name_override': 'foo.test.google.fr', 'grpc.default_authority': 'foo.test.google.fr'} + ); + server.start(); + resolve(); + } + ); + }); + }); + after(() => { + server.forceShutdown(); + }); + + it('Should send the metadata from call credentials attached to channel credentials', (done) => { + const call = client.echo( + { value: 'test value', value2: 3 }, + assert2.mustCall((error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + }) + ); + call.on('metadata', assert2.mustCall((metadata: grpc.Metadata) => { + assert.deepStrictEqual(metadata.get('test-key'), ['test-value']); + + })); + assert2.afterMustCallsSatisfied(done); + }); +}); \ No newline at end of file diff --git a/packages/grpc-js/test/test-outlier-detection.ts b/packages/grpc-js/test/test-outlier-detection.ts index 977a3058..74e53676 100644 --- a/packages/grpc-js/test/test-outlier-detection.ts +++ b/packages/grpc-js/test/test-outlier-detection.ts @@ -19,6 +19,7 @@ import * as assert from 'assert'; import * as path from 'path'; import * as grpc from '../src'; import { loadProtoFile } from './common'; +import { OutlierDetectionLoadBalancingConfig } from '../src/load-balancer-outlier-detection' function multiDone(done: Mocha.Done, target: number) { let count = 0; @@ -67,6 +68,251 @@ const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); const EchoService = loadProtoFile(protoFile) .EchoService as grpc.ServiceClientConstructor; +describe('Outlier detection config validation', () => { + describe('interval', () => { + it('Should reject a negative interval', () => { + const loadBalancingConfig = { + interval: { + seconds: -1, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /interval parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large interval', () => { + const loadBalancingConfig = { + interval: { + seconds: 1e12, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /interval parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a negative interval.nanos', () => { + const loadBalancingConfig = { + interval: { + seconds: 0, + nanos: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /interval parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large interval.nanos', () => { + const loadBalancingConfig = { + interval: { + seconds: 0, + nanos: 1e12 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /interval parse error: values out of range for non-negative Duaration/); + }); + }); + describe('base_ejection_time', () => { + it('Should reject a negative base_ejection_time', () => { + const loadBalancingConfig = { + base_ejection_time: { + seconds: -1, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /base_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large base_ejection_time', () => { + const loadBalancingConfig = { + base_ejection_time: { + seconds: 1e12, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /base_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a negative base_ejection_time.nanos', () => { + const loadBalancingConfig = { + base_ejection_time: { + seconds: 0, + nanos: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /base_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large base_ejection_time.nanos', () => { + const loadBalancingConfig = { + base_ejection_time: { + seconds: 0, + nanos: 1e12 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /base_ejection_time parse error: values out of range for non-negative Duaration/); + }); + }); + describe('max_ejection_time', () => { + it('Should reject a negative max_ejection_time', () => { + const loadBalancingConfig = { + max_ejection_time: { + seconds: -1, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large max_ejection_time', () => { + const loadBalancingConfig = { + max_ejection_time: { + seconds: 1e12, + nanos: 0 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a negative max_ejection_time.nanos', () => { + const loadBalancingConfig = { + max_ejection_time: { + seconds: 0, + nanos: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_time parse error: values out of range for non-negative Duaration/); + }); + it('Should reject a large max_ejection_time.nanos', () => { + const loadBalancingConfig = { + max_ejection_time: { + seconds: 0, + nanos: 1e12 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_time parse error: values out of range for non-negative Duaration/); + }); + }); + describe('max_ejection_percent', () => { + it('Should reject a value above 100', () => { + const loadBalancingConfig = { + max_ejection_percent: 101, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_percent parse error: value out of range for percentage/); + }); + it('Should reject a negative value', () => { + const loadBalancingConfig = { + max_ejection_percent: -1, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /max_ejection_percent parse error: value out of range for percentage/); + }); + }); + describe('success_rate_ejection.enforcement_percentage', () => { + it('Should reject a value above 100', () => { + const loadBalancingConfig = { + success_rate_ejection: { + enforcement_percentage: 101 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /success_rate_ejection\.enforcement_percentage parse error: value out of range for percentage/); + }); + it('Should reject a negative value', () => { + const loadBalancingConfig = { + success_rate_ejection: { + enforcement_percentage: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /success_rate_ejection\.enforcement_percentage parse error: value out of range for percentage/); + }); + }); + describe('failure_percentage_ejection.threshold', () => { + it('Should reject a value above 100', () => { + const loadBalancingConfig = { + failure_percentage_ejection: { + threshold: 101 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /failure_percentage_ejection\.threshold parse error: value out of range for percentage/); + }); + it('Should reject a negative value', () => { + const loadBalancingConfig = { + failure_percentage_ejection: { + threshold: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /failure_percentage_ejection\.threshold parse error: value out of range for percentage/); + }); + }); + describe('failure_percentage_ejection.enforcement_percentage', () => { + it('Should reject a value above 100', () => { + const loadBalancingConfig = { + failure_percentage_ejection: { + enforcement_percentage: 101 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /failure_percentage_ejection\.enforcement_percentage parse error: value out of range for percentage/); + }); + it('Should reject a negative value', () => { + const loadBalancingConfig = { + failure_percentage_ejection: { + enforcement_percentage: -1 + }, + child_policy: [{round_robin: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /failure_percentage_ejection\.enforcement_percentage parse error: value out of range for percentage/); + }); + }); +}); + describe('Outlier detection', () => { const GOOD_PORTS = 4; let goodServer: grpc.Server;