From 7b1bd147a67ab76e11cebc6b0dfd74f253feec3b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 8 Jul 2020 15:18:15 -0700 Subject: [PATCH] gts fix --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/xds-bootstrap.ts | 477 +++++++++-------- packages/grpc-js/src/xds-client.ts | 734 ++++++++++++++------------ 3 files changed, 655 insertions(+), 558 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 8a20a684..8831891d 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -52,7 +52,7 @@ "prepare": "npm run compile", "test": "gulp test", "check": "gts check src/**/*.ts", - "fix": "gts fix src/**/*.ts", + "fix": "gts fix src/*.ts", "pretest": "npm run compile", "posttest": "npm run check" }, diff --git a/packages/grpc-js/src/xds-bootstrap.ts b/packages/grpc-js/src/xds-bootstrap.ts index c8e88a01..90cded00 100644 --- a/packages/grpc-js/src/xds-bootstrap.ts +++ b/packages/grpc-js/src/xds-bootstrap.ts @@ -1,222 +1,255 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import * as fs from 'fs'; -import * as adsTypes from './generated/ads'; - -/* eslint-disable @typescript-eslint/no-explicit-any */ - -export interface ChannelCredsConfig { - type: string; - config?: object; -} - -export interface XdsServerConfig { - serverUri: string; - channelCreds: ChannelCredsConfig[]; -} - -export interface BootstrapInfo { - xdsServers: XdsServerConfig[]; - node: adsTypes.messages.envoy.api.v2.core.Node; -} - -function validateChannelCredsConfig(obj: any): ChannelCredsConfig { - if (!('type' in obj)) { - throw new Error('type field missing in xds_servers.channel_creds element'); - } - if (typeof obj.type !== 'string') { - throw new Error(`xds_servers.channel_creds.type field: expected string, got ${typeof obj.type}`); - } - if ('config' in obj) { - if (typeof obj.config !== 'object' || obj.config === null) { - throw new Error('xds_servers.channel_creds config field must be an object if provided'); - } - } - return { - type: obj.type, - config: obj.config - } -} - -function validateXdsServerConfig(obj: any): XdsServerConfig { - if (!('server_uri' in obj)) { - throw new Error('server_uri field missing in xds_servers element'); - } - if (typeof obj.server_uri !== 'string') { - throw new Error(`xds_servers.server_uri field: expected string, got ${typeof obj.server_uri}`); - } - if (!('channel_creds' in obj)) { - throw new Error('channel_creds missing in xds_servers element'); - } - if (!Array.isArray(obj.channel_creds)) { - throw new Error(`xds_servers.channel_creds field: expected array, got ${typeof obj.channel_creds}`); - } - if (obj.channel_creds.length === 0) { - throw new Error('xds_servers.channel_creds field: at least one entry is required'); - } - return { - serverUri: obj.server_uri, - channelCreds: obj.channel_creds.map(validateChannelCredsConfig) - }; -} - -function validateValue(obj: any): adsTypes.messages.google.protobuf.Value { - if (Array.isArray(obj)) { - return { - kind: 'listValue', - listValue: { - values: obj.map(value => validateValue(value)) - } - } - } else { - switch (typeof obj) { - case 'boolean': - return { - kind: 'boolValue', - boolValue: obj - }; - case 'number': - return { - kind: 'numberValue', - numberValue: obj - }; - case 'string': - return { - kind: 'stringValue', - stringValue: obj - }; - case 'object': - if (obj === null) { - return { - kind: 'nullValue', - nullValue: 'NULL_VALUE' - }; - } else { - return { - kind: 'structValue', - structValue: getStructFromJson(obj) - }; - } - default: - throw new Error(`Could not handle struct value of type ${typeof obj}`); - } - } -} - -function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct { - if (typeof obj !== 'object' || obj === null) { - throw new Error('Invalid JSON object for Struct field'); - } - const result = Object.keys(obj).map(key => validateValue(key)); - if (result.length === 1) { - return { - fields: result[0] - } - } else { - return { - fields: { - kind: 'listValue', - listValue: { - values: result - } - } - } - }; -} - -/** - * Validate that the input obj is a valid Node proto message. Only checks the - * fields we expect to see: id, cluster, locality, and metadata. - * @param obj - */ -function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node { - const result: adsTypes.messages.envoy.api.v2.core.Node = {}; - if (!('id' in obj)) { - throw new Error('id field missing in node element'); - } - if (typeof obj.id !== 'string') { - throw new Error(`node.id field: expected string, got ${typeof obj.id}`); - } - result.id = obj.id; - if (!('cluster' in obj)) { - throw new Error('cluster field missing in node element'); - } - if (typeof obj.cluster !== 'string') { - throw new Error(`node.cluster field: expected string, got ${typeof obj.cluster}`); - } - result.cluster = obj.cluster; - if (!('locality' in obj)) { - throw new Error('locality field missing in node element'); - } - result.locality = {}; - if ('region' in obj.locality) { - if (typeof obj.locality.region !== 'string') { - throw new Error(`node.locality.region field: expected string, got ${typeof obj.locality.region}`); - } - result.locality.region = obj.locality.region; - } - if ('zone' in obj.locality) { - if (typeof obj.locality.region !== 'string') { - throw new Error(`node.locality.zone field: expected string, got ${typeof obj.locality.zone}`); - } - result.locality.zone = obj.locality.zone; - } - if ('sub_zone' in obj.locality) { - if (typeof obj.locality.sub_zone !== 'string') { - throw new Error(`node.locality.sub_zone field: expected string, got ${typeof obj.locality.sub_zone}`); - } - result.locality.sub_zone = obj.locality.sub_zone; - } - if ('metadata' in obj) { - result.metadata = getStructFromJson(obj.metadata); - } - return result; -} - -function validateBootstrapFile(obj: any): BootstrapInfo { - return { - xdsServers: obj.xds_servers.map(validateXdsServerConfig), - node: validateNode(obj.node) - } -} - -let loadedBootstrapInfo: Promise | null = null; - -export async function loadBootstrapInfo(): Promise { - if (loadedBootstrapInfo !== null) { - return loadedBootstrapInfo; - } - const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP; - if (bootstrapPath === undefined) { - return Promise.reject(new Error('The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS')); - } - loadedBootstrapInfo = new Promise((resolve, reject) => { - fs.readFile(bootstrapPath, { encoding: 'utf8'}, (err, data) => { - if (err) { - reject(new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${err.message}`)); - } - try { - const parsedFile = JSON.parse(data); - resolve(validateBootstrapFile(parsedFile)); - } catch (e) { - reject(new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`)); - } - }); - }); - return loadedBootstrapInfo; -} \ No newline at end of file +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as fs from 'fs'; +import * as adsTypes from './generated/ads'; + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +export interface ChannelCredsConfig { + type: string; + config?: object; +} + +export interface XdsServerConfig { + serverUri: string; + channelCreds: ChannelCredsConfig[]; +} + +export interface BootstrapInfo { + xdsServers: XdsServerConfig[]; + node: adsTypes.messages.envoy.api.v2.core.Node; +} + +function validateChannelCredsConfig(obj: any): ChannelCredsConfig { + if (!('type' in obj)) { + throw new Error('type field missing in xds_servers.channel_creds element'); + } + if (typeof obj.type !== 'string') { + throw new Error( + `xds_servers.channel_creds.type field: expected string, got ${typeof obj.type}` + ); + } + if ('config' in obj) { + if (typeof obj.config !== 'object' || obj.config === null) { + throw new Error( + 'xds_servers.channel_creds config field must be an object if provided' + ); + } + } + return { + type: obj.type, + config: obj.config, + }; +} + +function validateXdsServerConfig(obj: any): XdsServerConfig { + if (!('server_uri' in obj)) { + throw new Error('server_uri field missing in xds_servers element'); + } + if (typeof obj.server_uri !== 'string') { + throw new Error( + `xds_servers.server_uri field: expected string, got ${typeof obj.server_uri}` + ); + } + if (!('channel_creds' in obj)) { + throw new Error('channel_creds missing in xds_servers element'); + } + if (!Array.isArray(obj.channel_creds)) { + throw new Error( + `xds_servers.channel_creds field: expected array, got ${typeof obj.channel_creds}` + ); + } + if (obj.channel_creds.length === 0) { + throw new Error( + 'xds_servers.channel_creds field: at least one entry is required' + ); + } + return { + serverUri: obj.server_uri, + channelCreds: obj.channel_creds.map(validateChannelCredsConfig), + }; +} + +function validateValue(obj: any): adsTypes.messages.google.protobuf.Value { + if (Array.isArray(obj)) { + return { + kind: 'listValue', + listValue: { + values: obj.map((value) => validateValue(value)), + }, + }; + } else { + switch (typeof obj) { + case 'boolean': + return { + kind: 'boolValue', + boolValue: obj, + }; + case 'number': + return { + kind: 'numberValue', + numberValue: obj, + }; + case 'string': + return { + kind: 'stringValue', + stringValue: obj, + }; + case 'object': + if (obj === null) { + return { + kind: 'nullValue', + nullValue: 'NULL_VALUE', + }; + } else { + return { + kind: 'structValue', + structValue: getStructFromJson(obj), + }; + } + default: + throw new Error(`Could not handle struct value of type ${typeof obj}`); + } + } +} + +function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct { + if (typeof obj !== 'object' || obj === null) { + throw new Error('Invalid JSON object for Struct field'); + } + const result = Object.keys(obj).map((key) => validateValue(key)); + if (result.length === 1) { + return { + fields: result[0], + }; + } else { + return { + fields: { + kind: 'listValue', + listValue: { + values: result, + }, + }, + }; + } +} + +/** + * Validate that the input obj is a valid Node proto message. Only checks the + * fields we expect to see: id, cluster, locality, and metadata. + * @param obj + */ +function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node { + const result: adsTypes.messages.envoy.api.v2.core.Node = {}; + if (!('id' in obj)) { + throw new Error('id field missing in node element'); + } + if (typeof obj.id !== 'string') { + throw new Error(`node.id field: expected string, got ${typeof obj.id}`); + } + result.id = obj.id; + if (!('cluster' in obj)) { + throw new Error('cluster field missing in node element'); + } + if (typeof obj.cluster !== 'string') { + throw new Error( + `node.cluster field: expected string, got ${typeof obj.cluster}` + ); + } + result.cluster = obj.cluster; + if (!('locality' in obj)) { + throw new Error('locality field missing in node element'); + } + result.locality = {}; + if ('region' in obj.locality) { + if (typeof obj.locality.region !== 'string') { + throw new Error( + `node.locality.region field: expected string, got ${typeof obj.locality + .region}` + ); + } + result.locality.region = obj.locality.region; + } + if ('zone' in obj.locality) { + if (typeof obj.locality.region !== 'string') { + throw new Error( + `node.locality.zone field: expected string, got ${typeof obj.locality + .zone}` + ); + } + result.locality.zone = obj.locality.zone; + } + if ('sub_zone' in obj.locality) { + if (typeof obj.locality.sub_zone !== 'string') { + throw new Error( + `node.locality.sub_zone field: expected string, got ${typeof obj + .locality.sub_zone}` + ); + } + result.locality.sub_zone = obj.locality.sub_zone; + } + if ('metadata' in obj) { + result.metadata = getStructFromJson(obj.metadata); + } + return result; +} + +function validateBootstrapFile(obj: any): BootstrapInfo { + return { + xdsServers: obj.xds_servers.map(validateXdsServerConfig), + node: validateNode(obj.node), + }; +} + +let loadedBootstrapInfo: Promise | null = null; + +export async function loadBootstrapInfo(): Promise { + if (loadedBootstrapInfo !== null) { + return loadedBootstrapInfo; + } + const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP; + if (bootstrapPath === undefined) { + return Promise.reject( + new Error( + 'The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS' + ) + ); + } + loadedBootstrapInfo = new Promise((resolve, reject) => { + fs.readFile(bootstrapPath, { encoding: 'utf8' }, (err, data) => { + if (err) { + reject( + new Error( + `Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${err.message}` + ) + ); + } + try { + const parsedFile = JSON.parse(data); + resolve(validateBootstrapFile(parsedFile)); + } catch (e) { + reject( + new Error( + `Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}` + ) + ); + } + }); + }); + return loadedBootstrapInfo; +} diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 05076214..732858a3 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -1,335 +1,399 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import * as protoLoader from '@grpc/proto-loader'; -import { loadPackageDefinition } from './make-client'; -import * as adsTypes from './generated/ads'; -import * as edsTypes from './generated/endpoint'; -import { createGoogleDefaultCredentials } from './channel-credentials'; -import { loadBootstrapInfo } from './xds-bootstrap'; -import { ClientDuplexStream, ServiceError } from './call'; -import { StatusObject } from './call-stream'; -import { isIPv4, isIPv6 } from 'net'; -import { Status, LogVerbosity } from './constants'; -import { Metadata } from './metadata'; -import * as logging from './logging'; -import { ServiceConfig } from './service-config'; -import { ChannelOptions } from './channel-options'; - -const TRACER_NAME = 'xds_client'; - -function trace(text: string): void { - logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); -} - -const clientVersion = require('../../package.json').version; - -const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; - -let loadedProtos: Promise | null = null; - -function loadAdsProtos(): Promise { - if (loadedProtos !== null) { - return loadedProtos; - } - loadedProtos = protoLoader.load([ - 'envoy/service/discovery/v2/ads.proto', - 'envoy/api/v2/listener.proto', - 'envoy/api/v2/route.proto', - 'envoy/api/v2/cluster.proto', - 'envoy/api/v2/endpoint.proto' - ], { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true, - includeDirs: [ - 'deps/envoy-api/', - 'deps/udpa/', - 'node_modules/protobufjs/', - 'deps/googleapis/', - 'deps/protoc-gen-validate/' - ] - }).then(packageDefinition => loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType); - return loadedProtos; -} - -export interface Watcher { - onValidUpdate(update: UpdateType): void; - onTransientError(error: StatusObject): void; - onResourceDoesNotExist(): void; -} - -export class XdsClient { - private node: adsTypes.messages.envoy.api.v2.core.Node | null = null; - private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null; - private adsCall: ClientDuplexStream | null = null; - - private hasShutdown: boolean = false; - - private endpointWatchers: Map[]> = new Map[]>(); - private lastEdsVersionInfo: string = ''; - private lastEdsNonce: string = ''; - - constructor(private targetName: string, private serviceConfigWatcher: Watcher, channelOptions: ChannelOptions) { - const channelArgs = {...channelOptions}; - const channelArgsToRemove = [ - /* The SSL target name override corresponds to the target, and this - * client has its own target */ - 'grpc.ssl_target_name_override', - /* The default authority also corresponds to the target */ - 'grpc.default_authority', - /* This client will have its own specific keepalive time setting */ - 'grpc.keepalive_time_ms', - /* The service config specifies the load balancing policy. This channel - * needs its own separate load balancing policy setting. In particular, - * recursively using an xDS load balancer for the xDS client would be - * bad */ - 'grpc.service_config' - ]; - for (const arg of channelArgsToRemove) { - delete channelArgs[arg]; - } - channelArgs['grpc.keepalive_time_ms'] = 5000; - Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => { - if (this.hasShutdown) { - return; - } - this.node = { - ...bootstrapInfo.node, - build_version: `gRPC Node Pure JS ${clientVersion}`, - user_agent_name: 'gRPC Node Pure JS' - } - this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), channelArgs); - this.maybeStartAdsStream(); - }, (error) => { - trace('Failed to initialize xDS Client. ' + error.message); - // Bubble this error up to any listeners - this.reportStreamError({ - code: Status.INTERNAL, - details: `Failed to initialize xDS Client. ${error.message}`, - metadata: new Metadata() - }); - }); - } - - /** - * Start the ADS stream if the client exists and there is not already an - * existing stream, and there - */ - private maybeStartAdsStream() { - if (this.client === null) { - return; - } - if (this.adsCall !== null) { - return; - } - if (this.hasShutdown) { - return; - } - this.adsCall = this.client.StreamAggregatedResources(); - this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => { - switch (message.type_url) { - case EDS_TYPE_URL: - const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = []; - for (const resource of message.resources) { - if (protoLoader.isAnyExtension(resource) && resource['@type'] === EDS_TYPE_URL) { - const resp = resource as protoLoader.AnyExtension & edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output; - if (!this.validateEdsResponse(resp)) { - this.nackEds('ClusterLoadAssignment validation failed'); - return; - } - edsResponses.push(resp); - } else { - this.nackEds(`Invalid resource type ${protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url}`); - return; - } - } - for (const message of edsResponses) { - this.handleEdsResponse(message); - } - this.lastEdsVersionInfo = message.version_info; - this.lastEdsNonce = message.nonce; - this.ackEds(); - break; - default: - this.nackUnknown(message.type_url, message.version_info, message.nonce); - } - }); - this.adsCall.on('error', (error: ServiceError) => { - trace('ADS stream ended. code=' + error.code + ' details= ' + error.details); - this.adsCall = null; - this.reportStreamError(error); - /* Connection backoff is handled by the client object, so we can - * immediately start a new request to indicate that it should try to - * reconnect */ - this.maybeStartAdsStream(); - }); - const endpointWatcherNames = Array.from(this.endpointWatchers.keys()); - if (endpointWatcherNames.length > 0) { - this.adsCall.write({ - node: this.node!, - type_url: EDS_TYPE_URL, - resource_names: endpointWatcherNames - }); - } - } - - private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.node!, - type_url: typeUrl, - version_info: versionInfo, - response_nonce: nonce, - error_detail: { - message: `Unknown type_url ${typeUrl}` - } - }); - } - - /** - * Acknowledge an EDS update. This should be called after the local nonce and - * version info are updated so that it sends the post-update values. - */ - private ackEds() { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.node!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo - }); - } - - /** - * Reject an EDS update. This should be called without updating the local - * nonce and version info. - */ - private nackEds(message: string) { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.node!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo, - error_detail: { - message - } - }); - } - - /** - * Validate the ClusterLoadAssignment object by these rules: - * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto - * @param message - */ - private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean { - for (const endpoint of message.endpoints) { - for (const lb of endpoint.lb_endpoints) { - const socketAddress = lb.endpoint?.address?.socket_address; - if (!socketAddress) { - return false; - } - if (socketAddress.port_specifier !== 'port_value') { - return false; - } - if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { - return false; - } - } - } - return true; - } - - private handleEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output) { - const watchers = this.endpointWatchers.get(message.cluster_name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message); - } - } - - private updateEdsNames() { - if (this.adsCall) { - this.adsCall.write({ - node: this.node!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo - }); - } - } - - private reportStreamError(status: StatusObject) { - for (const watcherList of this.endpointWatchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - // Also do the same for other types of watchers when those are implemented - } - - addEndpointWatcher(edsServiceName: string, watcher: Watcher) { - trace('Watcher added for endpoint ' + edsServiceName); - let watchersEntry = this.endpointWatchers.get(edsServiceName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.endpointWatchers.set(edsServiceName, watchersEntry); - } - watchersEntry.push(watcher); - if (addedServiceName) { - this.updateEdsNames(); - } - } - - removeEndpointWatcher(edsServiceName: string, watcher: Watcher) { - trace('Watcher removed for endpoint ' + edsServiceName); - const watchersEntry = this.endpointWatchers.get(edsServiceName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.endpointWatchers.delete(edsServiceName); - } - } - if (removedServiceName) { - this.updateEdsNames(); - } - } - - shutdown(): void { - this.adsCall?.cancel(); - this.client?.close(); - this.hasShutdown = true; - } -} \ No newline at end of file +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as protoLoader from '@grpc/proto-loader'; +import { loadPackageDefinition } from './make-client'; +import * as adsTypes from './generated/ads'; +import * as edsTypes from './generated/endpoint'; +import { createGoogleDefaultCredentials } from './channel-credentials'; +import { loadBootstrapInfo } from './xds-bootstrap'; +import { ClientDuplexStream, ServiceError } from './call'; +import { StatusObject } from './call-stream'; +import { isIPv4, isIPv6 } from 'net'; +import { Status, LogVerbosity } from './constants'; +import { Metadata } from './metadata'; +import * as logging from './logging'; +import { ServiceConfig } from './service-config'; +import { ChannelOptions } from './channel-options'; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} + +const clientVersion = require('../../package.json').version; + +const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; + +let loadedProtos: Promise | null = null; + +function loadAdsProtos(): Promise { + if (loadedProtos !== null) { + return loadedProtos; + } + loadedProtos = protoLoader + .load( + [ + 'envoy/service/discovery/v2/ads.proto', + 'envoy/api/v2/listener.proto', + 'envoy/api/v2/route.proto', + 'envoy/api/v2/cluster.proto', + 'envoy/api/v2/endpoint.proto', + ], + { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + includeDirs: [ + 'deps/envoy-api/', + 'deps/udpa/', + 'node_modules/protobufjs/', + 'deps/googleapis/', + 'deps/protoc-gen-validate/', + ], + } + ) + .then( + (packageDefinition) => + (loadPackageDefinition( + packageDefinition + ) as unknown) as adsTypes.ProtoGrpcType + ); + return loadedProtos; +} + +export interface Watcher { + onValidUpdate(update: UpdateType): void; + onTransientError(error: StatusObject): void; + onResourceDoesNotExist(): void; +} + +export class XdsClient { + private node: adsTypes.messages.envoy.api.v2.core.Node | null = null; + private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null; + private adsCall: ClientDuplexStream< + adsTypes.messages.envoy.api.v2.DiscoveryRequest, + adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output + > | null = null; + + private hasShutdown = false; + + private endpointWatchers: Map< + string, + Watcher[] + > = new Map< + string, + Watcher[] + >(); + private lastEdsVersionInfo = ''; + private lastEdsNonce = ''; + + constructor( + private targetName: string, + private serviceConfigWatcher: Watcher, + channelOptions: ChannelOptions + ) { + const channelArgs = { ...channelOptions }; + const channelArgsToRemove = [ + /* The SSL target name override corresponds to the target, and this + * client has its own target */ + 'grpc.ssl_target_name_override', + /* The default authority also corresponds to the target */ + 'grpc.default_authority', + /* This client will have its own specific keepalive time setting */ + 'grpc.keepalive_time_ms', + /* The service config specifies the load balancing policy. This channel + * needs its own separate load balancing policy setting. In particular, + * recursively using an xDS load balancer for the xDS client would be + * bad */ + 'grpc.service_config', + ]; + for (const arg of channelArgsToRemove) { + delete channelArgs[arg]; + } + channelArgs['grpc.keepalive_time_ms'] = 5000; + Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then( + ([bootstrapInfo, protoDefinitions]) => { + if (this.hasShutdown) { + return; + } + this.node = { + ...bootstrapInfo.node, + build_version: `gRPC Node Pure JS ${clientVersion}`, + user_agent_name: 'gRPC Node Pure JS', + }; + this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService( + bootstrapInfo.xdsServers[0].serverUri, + createGoogleDefaultCredentials(), + channelArgs + ); + this.maybeStartAdsStream(); + }, + (error) => { + trace('Failed to initialize xDS Client. ' + error.message); + // Bubble this error up to any listeners + this.reportStreamError({ + code: Status.INTERNAL, + details: `Failed to initialize xDS Client. ${error.message}`, + metadata: new Metadata(), + }); + } + ); + } + + /** + * Start the ADS stream if the client exists and there is not already an + * existing stream, and there + */ + private maybeStartAdsStream() { + if (this.client === null) { + return; + } + if (this.adsCall !== null) { + return; + } + if (this.hasShutdown) { + return; + } + this.adsCall = this.client.StreamAggregatedResources(); + this.adsCall.on( + 'data', + (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => { + switch (message.type_url) { + case EDS_TYPE_URL: { + const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = []; + for (const resource of message.resources) { + if ( + protoLoader.isAnyExtension(resource) && + resource['@type'] === EDS_TYPE_URL + ) { + const resp = resource as protoLoader.AnyExtension & + edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output; + if (!this.validateEdsResponse(resp)) { + this.nackEds('ClusterLoadAssignment validation failed'); + return; + } + edsResponses.push(resp); + } else { + this.nackEds( + `Invalid resource type ${ + protoLoader.isAnyExtension(resource) + ? resource['@type'] + : resource.type_url + }` + ); + return; + } + } + for (const message of edsResponses) { + this.handleEdsResponse(message); + } + this.lastEdsVersionInfo = message.version_info; + this.lastEdsNonce = message.nonce; + this.ackEds(); + break; + } + default: + this.nackUnknown( + message.type_url, + message.version_info, + message.nonce + ); + } + } + ); + this.adsCall.on('error', (error: ServiceError) => { + trace( + 'ADS stream ended. code=' + error.code + ' details= ' + error.details + ); + this.adsCall = null; + this.reportStreamError(error); + /* Connection backoff is handled by the client object, so we can + * immediately start a new request to indicate that it should try to + * reconnect */ + this.maybeStartAdsStream(); + }); + const endpointWatcherNames = Array.from(this.endpointWatchers.keys()); + if (endpointWatcherNames.length > 0) { + this.adsCall.write({ + node: this.node!, + type_url: EDS_TYPE_URL, + resource_names: endpointWatcherNames, + }); + } + } + + private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node!, + type_url: typeUrl, + version_info: versionInfo, + response_nonce: nonce, + error_detail: { + message: `Unknown type_url ${typeUrl}`, + }, + }); + } + + /** + * Acknowledge an EDS update. This should be called after the local nonce and + * version info are updated so that it sends the post-update values. + */ + private ackEds() { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node!, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo, + }); + } + + /** + * Reject an EDS update. This should be called without updating the local + * nonce and version info. + */ + private nackEds(message: string) { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node!, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo, + error_detail: { + message, + }, + }); + } + + /** + * Validate the ClusterLoadAssignment object by these rules: + * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto + * @param message + */ + private validateEdsResponse( + message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output + ): boolean { + for (const endpoint of message.endpoints) { + for (const lb of endpoint.lb_endpoints) { + const socketAddress = lb.endpoint?.address?.socket_address; + if (!socketAddress) { + return false; + } + if (socketAddress.port_specifier !== 'port_value') { + return false; + } + if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + return false; + } + } + } + return true; + } + + private handleEdsResponse( + message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output + ) { + const watchers = this.endpointWatchers.get(message.cluster_name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + + private updateEdsNames() { + if (this.adsCall) { + this.adsCall.write({ + node: this.node!, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo, + }); + } + } + + private reportStreamError(status: StatusObject) { + for (const watcherList of this.endpointWatchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + // Also do the same for other types of watchers when those are implemented + } + + addEndpointWatcher( + edsServiceName: string, + watcher: Watcher< + edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output + > + ) { + trace('Watcher added for endpoint ' + edsServiceName); + let watchersEntry = this.endpointWatchers.get(edsServiceName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.endpointWatchers.set(edsServiceName, watchersEntry); + } + watchersEntry.push(watcher); + if (addedServiceName) { + this.updateEdsNames(); + } + } + + removeEndpointWatcher( + edsServiceName: string, + watcher: Watcher< + edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output + > + ) { + trace('Watcher removed for endpoint ' + edsServiceName); + const watchersEntry = this.endpointWatchers.get(edsServiceName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.endpointWatchers.delete(edsServiceName); + } + } + if (removedServiceName) { + this.updateEdsNames(); + } + } + + shutdown(): void { + this.adsCall?.cancel(); + this.client?.close(); + this.hasShutdown = true; + } +}