diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 8a08276e..6c8bcb67 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -45,7 +45,7 @@ import { EdsState } from './xds-stream-state/eds-state'; import { CdsState } from './xds-stream-state/cds-state'; import { RdsState } from './xds-stream-state/rds-state'; import { LdsState } from './xds-stream-state/lds-state'; -import { Watcher } from './xds-stream-state/xds-stream-state'; +import { HandleResponseResult, ResourcePair, Watcher } from './xds-stream-state/xds-stream-state'; import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment'; import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster'; import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration'; @@ -242,11 +242,14 @@ function getResponseMessages( targetTypeUrl: T, allowedTypeUrls: string[], resources: Any__Output[] -): AdsOutputType[] { - const result: AdsOutputType[] = []; +): ResourcePair>[] { + const result: ResourcePair>[] = []; for (const resource of resources) { if (allowedTypeUrls.includes(resource.type_url)) { - result.push(decodeSingleResource(targetTypeUrl, resource.value)); + result.push({ + resource: decodeSingleResource(targetTypeUrl, resource.value), + raw: resource + }); } else { throw new Error( `ADS Error: Invalid resource type ${resource.type_url}, expected ${allowedTypeUrls}` @@ -450,8 +453,10 @@ export class XdsClient { } private handleAdsResponse(message: DiscoveryResponse__Output) { - let errorString: string | null; - let serviceKind: AdsServiceKind; + let handleResponseResult: { + result: HandleResponseResult; + serviceKind: AdsServiceKind; + } | null = null; let isV2: boolean; switch (message.type_url) { case EDS_TYPE_URL_V2: @@ -463,56 +468,71 @@ export class XdsClient { default: isV2 = false; } - switch (message.type_url) { - case EDS_TYPE_URL_V2: - case EDS_TYPE_URL_V3: - errorString = this.adsState.eds.handleResponses( - getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources), - isV2 - ); - serviceKind = 'eds'; - break; - case CDS_TYPE_URL_V2: - case CDS_TYPE_URL_V3: - errorString = this.adsState.cds.handleResponses( - getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources), - isV2 - ); - serviceKind = 'cds'; - break; - case RDS_TYPE_URL_V2: - case RDS_TYPE_URL_V3: - errorString = this.adsState.rds.handleResponses( - getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources), - isV2 - ); - serviceKind = 'rds'; - break; - case LDS_TYPE_URL_V2: - case LDS_TYPE_URL_V3: - errorString = this.adsState.lds.handleResponses( - getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources), - isV2 - ); - serviceKind = 'lds'; - break; - default: - errorString = `Unknown type_url ${message.type_url}`; - // This is not used in this branch, but setting it makes the types easier to handle - serviceKind = 'eds'; + try { + switch (message.type_url) { + case EDS_TYPE_URL_V2: + case EDS_TYPE_URL_V3: + handleResponseResult = { + result: this.adsState.eds.handleResponses( + getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources), + isV2 + ), + serviceKind: 'eds' + }; + break; + case CDS_TYPE_URL_V2: + case CDS_TYPE_URL_V3: + handleResponseResult = { + result: this.adsState.cds.handleResponses( + getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources), + isV2 + ), + serviceKind: 'cds' + }; + break; + case RDS_TYPE_URL_V2: + case RDS_TYPE_URL_V3: + handleResponseResult = { + result: this.adsState.rds.handleResponses( + getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources), + isV2 + ), + serviceKind: 'rds' + }; + break; + case LDS_TYPE_URL_V2: + case LDS_TYPE_URL_V3: + handleResponseResult = { + result: this.adsState.lds.handleResponses( + getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources), + isV2 + ), + serviceKind: 'lds' + } + break; + } + } catch (e) { + trace('Nacking message with protobuf parsing error: ' + e.message); + this.nack(message.type_url, e.message); } - if (errorString === null) { - trace('Acking message with type URL ' + message.type_url); - /* errorString can only be null in one of the first 4 cases, which - * implies that message.type_url is one of the 4 known type URLs, which - * means that this type assertion is valid. */ - const typeUrl = message.type_url as AdsTypeUrl; - this.adsState[serviceKind].nonce = message.nonce; - this.adsState[serviceKind].versionInfo = message.version_info; - this.ack(serviceKind); + if (handleResponseResult === null) { + // Null handleResponseResult means that the type_url was unrecognized + trace('Nacking message with unknown type URL ' + message.type_url); + this.nack(message.type_url, `Unknown type_url ${message.type_url}`); } else { - trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"'); - this.nack(message.type_url, errorString); + if (handleResponseResult.result.rejected.length > 0) { + // rejected.length > 0 means that at least one message validation failed + const errorString = `${handleResponseResult.serviceKind.toUpperCase()} Error: ${handleResponseResult.result.rejected[0].error}`; + trace('Nacking message with type URL ' + message.type_url + ': ' + errorString); + this.nack(message.type_url, errorString); + } else { + // If we get here, all message validation succeeded + trace('Acking message with type URL ' + message.type_url); + const serviceKind = handleResponseResult.serviceKind; + this.adsState[serviceKind].nonce = message.nonce; + this.adsState[serviceKind].versionInfo = message.version_info; + this.ack(serviceKind); + } } } diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts index 05477388..ce0434b8 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -17,8 +17,9 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster"; +import { Any__Output } from "../generated/google/protobuf/Any"; import { EdsState } from "./eds-state"; -import { Watcher, XdsStreamState } from "./xds-stream-state"; +import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; const TRACER_NAME = 'xds_client'; @@ -125,26 +126,40 @@ export class CdsState implements XdsStreamState { * onResourceDoesNotExist method. * @param allClusterNames */ - private handleMissingNames(allClusterNames: Set) { + private handleMissingNames(allClusterNames: Set): string[] { + const missingNames: string[] = []; for (const [clusterName, watcherList] of this.watchers.entries()) { if (!allClusterNames.has(clusterName)) { trace('Reporting CDS resource does not exist for clusterName ' + clusterName); + missingNames.push(clusterName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } } } + return missingNames; } - handleResponses(responses: Cluster__Output[], isV2: boolean): string | null { + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { const validResponses: Cluster__Output[] = []; - let errorMessage: string | null = null; - for (const message of responses) { - if (this.validateResponse(message)) { - validResponses.push(message); + const result: HandleResponseResult = { + accepted: [], + rejected: [], + missing: [] + } + for (const {resource, raw} of responses) { + if (this.validateResponse(resource)) { + validResponses.push(resource); + result.accepted.push({ + name: resource.name, + raw: raw}); } else { - trace('CDS validation failed for message ' + JSON.stringify(message)); - errorMessage = 'CDS Error: Cluster validation failed'; + trace('CDS validation failed for message ' + JSON.stringify(resource)); + result.rejected.push({ + name: resource.name, + raw: raw, + error: `Cluster validation failed for resource ${resource.name}` + }); } } this.latestResponses = validResponses; @@ -163,9 +178,9 @@ export class CdsState implements XdsStreamState { } } trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']'); - this.handleMissingNames(allClusterNames); + result.missing = this.handleMissingNames(allClusterNames); this.edsState.handleMissingNames(allEdsServiceNames); - return errorMessage; + return result; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index f5a8b774..5360400c 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -18,7 +18,8 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { isIPv4, isIPv6 } from "net"; import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; -import { Watcher, XdsStreamState } from "./xds-stream-state"; +import { Any__Output } from "../generated/google/protobuf/Any"; +import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; const TRACER_NAME = 'xds_client'; @@ -145,15 +146,26 @@ export class EdsState implements XdsStreamState { } } - handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) { + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { const validResponses: ClusterLoadAssignment__Output[] = []; - let errorMessage: string | null = null; - for (const message of responses) { - if (this.validateResponse(message)) { - validResponses.push(message); + let result: HandleResponseResult = { + accepted: [], + rejected: [], + missing: [] + } + for (const {resource, raw} of responses) { + if (this.validateResponse(resource)) { + validResponses.push(resource); + result.accepted.push({ + name: resource.cluster_name, + raw: raw}); } else { - trace('EDS validation failed for message ' + JSON.stringify(message)); - errorMessage = 'EDS Error: ClusterLoadAssignment validation failed'; + trace('EDS validation failed for message ' + JSON.stringify(resource)); + result.rejected.push({ + name: resource.cluster_name, + raw: raw, + error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}` + }); } } this.latestResponses = validResponses; @@ -167,7 +179,7 @@ export class EdsState implements XdsStreamState { } } trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']'); - return errorMessage; + return result; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 7318b3b8..0c4fdc51 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -19,11 +19,12 @@ import * as protoLoader from '@grpc/proto-loader'; import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener'; import { RdsState } from "./rds-state"; -import { Watcher, XdsStreamState } from "./xds-stream-state"; +import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager'; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources'; import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter'; import { EXPERIMENTAL_FAULT_INJECTION } from '../environment'; +import { Any__Output } from '../generated/google/protobuf/Any'; const TRACER_NAME = 'xds_client'; @@ -143,25 +144,40 @@ export class LdsState implements XdsStreamState { return false; } - private handleMissingNames(allTargetNames: Set) { + private handleMissingNames(allTargetNames: Set): string[] { + const missingNames: string[] = []; for (const [targetName, watcherList] of this.watchers.entries()) { if (!allTargetNames.has(targetName)) { + missingNames.push(targetName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } } } + return missingNames; } - handleResponses(responses: Listener__Output[], isV2: boolean): string | null { + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { const validResponses: Listener__Output[] = []; - let errorMessage: string | null = null; - for (const message of responses) { - if (this.validateResponse(message, isV2)) { - validResponses.push(message); + let result: HandleResponseResult = { + accepted: [], + rejected: [], + missing: [] + } + for (const {resource, raw} of responses) { + if (this.validateResponse(resource, isV2)) { + validResponses.push(resource); + result.accepted.push({ + name: resource.name, + raw: raw + }); } else { - trace('LDS validation failed for message ' + JSON.stringify(message)); - errorMessage = 'LDS Error: Route validation failed'; + trace('LDS validation failed for message ' + JSON.stringify(resource)); + result.rejected.push({ + name: resource.name, + raw: raw, + error: `Listener validation failed for resource ${resource.name}` + }); } } this.latestResponses = validResponses; @@ -180,9 +196,9 @@ export class LdsState implements XdsStreamState { } } trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']'); - this.handleMissingNames(allTargetNames); + result.missing = this.handleMissingNames(allTargetNames); this.rdsState.handleMissingNames(allRouteConfigNames); - return errorMessage; + return result; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index bb7e0bc5..0ff4c2aa 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -18,9 +18,10 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { EXPERIMENTAL_FAULT_INJECTION } from "../environment"; import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration"; +import { Any__Output } from "../generated/google/protobuf/Any"; import { validateOverrideFilter } from "../http-filter"; import { CdsLoadBalancingConfig } from "../load-balancer-cds"; -import { Watcher, XdsStreamState } from "./xds-stream-state"; +import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; import ServiceConfig = experimental.ServiceConfig; const TRACER_NAME = 'xds_client'; @@ -182,15 +183,26 @@ export class RdsState implements XdsStreamState { } } - handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null { + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { const validResponses: RouteConfiguration__Output[] = []; - let errorMessage: string | null = null; - for (const message of responses) { - if (this.validateResponse(message, isV2)) { - validResponses.push(message); + let result: HandleResponseResult = { + accepted: [], + rejected: [], + missing: [] + } + for (const {resource, raw} of responses) { + if (this.validateResponse(resource, isV2)) { + validResponses.push(resource); + result.accepted.push({ + name: resource.name, + raw: raw}); } else { - trace('RDS validation failed for message ' + JSON.stringify(message)); - errorMessage = 'RDS Error: Route validation failed'; + trace('RDS validation failed for message ' + JSON.stringify(resource)); + result.rejected.push({ + name: resource.name, + raw: raw, + error: `Route validation failed for resource ${resource.name}` + }); } } this.latestResponses = validResponses; @@ -204,7 +216,7 @@ export class RdsState implements XdsStreamState { } } trace('Received RDS response with route config names [' + Array.from(allRouteConfigNames) + ']'); - return errorMessage; + return result; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 14f3d1c7..c8cbc41c 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -16,6 +16,7 @@ */ import { StatusObject } from "@grpc/grpc-js"; +import { Any__Output } from "../generated/google/protobuf/Any"; export interface Watcher { /* Including the isV2 flag here is a bit of a kludge. It would probably be @@ -28,6 +29,28 @@ export interface Watcher { onResourceDoesNotExist(): void; } +export interface ResourcePair { + resource: ResourceType; + raw: Any__Output; +} + +export interface AcceptedResourceEntry { + name: string; + raw: Any__Output; +} + +export interface RejectedResourceEntry { + name: string; + raw: Any__Output; + error: string; +} + +export interface HandleResponseResult { + accepted: AcceptedResourceEntry[]; + rejected: RejectedResourceEntry[]; + missing: string[]; +} + export interface XdsStreamState { versionInfo: string; nonce: string; @@ -37,7 +60,7 @@ export interface XdsStreamState { * or null if it should be acked. * @param responses */ - handleResponses(responses: ResponseType[], isV2: boolean): string | null; + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult; reportStreamError(status: StatusObject): void; } \ No newline at end of file