This commit is contained in:
Michael Lumish 2020-07-08 15:18:15 -07:00
parent 0c41a4e039
commit 7b1bd147a6
3 changed files with 655 additions and 558 deletions

View File

@ -52,7 +52,7 @@
"prepare": "npm run compile",
"test": "gulp test",
"check": "gts check src/**/*.ts",
"fix": "gts fix src/**/*.ts",
"fix": "gts fix src/*.ts",
"pretest": "npm run compile",
"posttest": "npm run check"
},

View File

@ -1,222 +1,255 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as fs from 'fs';
import * as adsTypes from './generated/ads';
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface ChannelCredsConfig {
type: string;
config?: object;
}
export interface XdsServerConfig {
serverUri: string;
channelCreds: ChannelCredsConfig[];
}
export interface BootstrapInfo {
xdsServers: XdsServerConfig[];
node: adsTypes.messages.envoy.api.v2.core.Node;
}
function validateChannelCredsConfig(obj: any): ChannelCredsConfig {
if (!('type' in obj)) {
throw new Error('type field missing in xds_servers.channel_creds element');
}
if (typeof obj.type !== 'string') {
throw new Error(`xds_servers.channel_creds.type field: expected string, got ${typeof obj.type}`);
}
if ('config' in obj) {
if (typeof obj.config !== 'object' || obj.config === null) {
throw new Error('xds_servers.channel_creds config field must be an object if provided');
}
}
return {
type: obj.type,
config: obj.config
}
}
function validateXdsServerConfig(obj: any): XdsServerConfig {
if (!('server_uri' in obj)) {
throw new Error('server_uri field missing in xds_servers element');
}
if (typeof obj.server_uri !== 'string') {
throw new Error(`xds_servers.server_uri field: expected string, got ${typeof obj.server_uri}`);
}
if (!('channel_creds' in obj)) {
throw new Error('channel_creds missing in xds_servers element');
}
if (!Array.isArray(obj.channel_creds)) {
throw new Error(`xds_servers.channel_creds field: expected array, got ${typeof obj.channel_creds}`);
}
if (obj.channel_creds.length === 0) {
throw new Error('xds_servers.channel_creds field: at least one entry is required');
}
return {
serverUri: obj.server_uri,
channelCreds: obj.channel_creds.map(validateChannelCredsConfig)
};
}
function validateValue(obj: any): adsTypes.messages.google.protobuf.Value {
if (Array.isArray(obj)) {
return {
kind: 'listValue',
listValue: {
values: obj.map(value => validateValue(value))
}
}
} else {
switch (typeof obj) {
case 'boolean':
return {
kind: 'boolValue',
boolValue: obj
};
case 'number':
return {
kind: 'numberValue',
numberValue: obj
};
case 'string':
return {
kind: 'stringValue',
stringValue: obj
};
case 'object':
if (obj === null) {
return {
kind: 'nullValue',
nullValue: 'NULL_VALUE'
};
} else {
return {
kind: 'structValue',
structValue: getStructFromJson(obj)
};
}
default:
throw new Error(`Could not handle struct value of type ${typeof obj}`);
}
}
}
function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct {
if (typeof obj !== 'object' || obj === null) {
throw new Error('Invalid JSON object for Struct field');
}
const result = Object.keys(obj).map(key => validateValue(key));
if (result.length === 1) {
return {
fields: result[0]
}
} else {
return {
fields: {
kind: 'listValue',
listValue: {
values: result
}
}
}
};
}
/**
* Validate that the input obj is a valid Node proto message. Only checks the
* fields we expect to see: id, cluster, locality, and metadata.
* @param obj
*/
function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node {
const result: adsTypes.messages.envoy.api.v2.core.Node = {};
if (!('id' in obj)) {
throw new Error('id field missing in node element');
}
if (typeof obj.id !== 'string') {
throw new Error(`node.id field: expected string, got ${typeof obj.id}`);
}
result.id = obj.id;
if (!('cluster' in obj)) {
throw new Error('cluster field missing in node element');
}
if (typeof obj.cluster !== 'string') {
throw new Error(`node.cluster field: expected string, got ${typeof obj.cluster}`);
}
result.cluster = obj.cluster;
if (!('locality' in obj)) {
throw new Error('locality field missing in node element');
}
result.locality = {};
if ('region' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(`node.locality.region field: expected string, got ${typeof obj.locality.region}`);
}
result.locality.region = obj.locality.region;
}
if ('zone' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(`node.locality.zone field: expected string, got ${typeof obj.locality.zone}`);
}
result.locality.zone = obj.locality.zone;
}
if ('sub_zone' in obj.locality) {
if (typeof obj.locality.sub_zone !== 'string') {
throw new Error(`node.locality.sub_zone field: expected string, got ${typeof obj.locality.sub_zone}`);
}
result.locality.sub_zone = obj.locality.sub_zone;
}
if ('metadata' in obj) {
result.metadata = getStructFromJson(obj.metadata);
}
return result;
}
function validateBootstrapFile(obj: any): BootstrapInfo {
return {
xdsServers: obj.xds_servers.map(validateXdsServerConfig),
node: validateNode(obj.node)
}
}
let loadedBootstrapInfo: Promise<BootstrapInfo> | null = null;
export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
if (loadedBootstrapInfo !== null) {
return loadedBootstrapInfo;
}
const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP;
if (bootstrapPath === undefined) {
return Promise.reject(new Error('The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS'));
}
loadedBootstrapInfo = new Promise((resolve, reject) => {
fs.readFile(bootstrapPath, { encoding: 'utf8'}, (err, data) => {
if (err) {
reject(new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${err.message}`));
}
try {
const parsedFile = JSON.parse(data);
resolve(validateBootstrapFile(parsedFile));
} catch (e) {
reject(new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`));
}
});
});
return loadedBootstrapInfo;
}
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as fs from 'fs';
import * as adsTypes from './generated/ads';
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface ChannelCredsConfig {
type: string;
config?: object;
}
export interface XdsServerConfig {
serverUri: string;
channelCreds: ChannelCredsConfig[];
}
export interface BootstrapInfo {
xdsServers: XdsServerConfig[];
node: adsTypes.messages.envoy.api.v2.core.Node;
}
function validateChannelCredsConfig(obj: any): ChannelCredsConfig {
if (!('type' in obj)) {
throw new Error('type field missing in xds_servers.channel_creds element');
}
if (typeof obj.type !== 'string') {
throw new Error(
`xds_servers.channel_creds.type field: expected string, got ${typeof obj.type}`
);
}
if ('config' in obj) {
if (typeof obj.config !== 'object' || obj.config === null) {
throw new Error(
'xds_servers.channel_creds config field must be an object if provided'
);
}
}
return {
type: obj.type,
config: obj.config,
};
}
function validateXdsServerConfig(obj: any): XdsServerConfig {
if (!('server_uri' in obj)) {
throw new Error('server_uri field missing in xds_servers element');
}
if (typeof obj.server_uri !== 'string') {
throw new Error(
`xds_servers.server_uri field: expected string, got ${typeof obj.server_uri}`
);
}
if (!('channel_creds' in obj)) {
throw new Error('channel_creds missing in xds_servers element');
}
if (!Array.isArray(obj.channel_creds)) {
throw new Error(
`xds_servers.channel_creds field: expected array, got ${typeof obj.channel_creds}`
);
}
if (obj.channel_creds.length === 0) {
throw new Error(
'xds_servers.channel_creds field: at least one entry is required'
);
}
return {
serverUri: obj.server_uri,
channelCreds: obj.channel_creds.map(validateChannelCredsConfig),
};
}
function validateValue(obj: any): adsTypes.messages.google.protobuf.Value {
if (Array.isArray(obj)) {
return {
kind: 'listValue',
listValue: {
values: obj.map((value) => validateValue(value)),
},
};
} else {
switch (typeof obj) {
case 'boolean':
return {
kind: 'boolValue',
boolValue: obj,
};
case 'number':
return {
kind: 'numberValue',
numberValue: obj,
};
case 'string':
return {
kind: 'stringValue',
stringValue: obj,
};
case 'object':
if (obj === null) {
return {
kind: 'nullValue',
nullValue: 'NULL_VALUE',
};
} else {
return {
kind: 'structValue',
structValue: getStructFromJson(obj),
};
}
default:
throw new Error(`Could not handle struct value of type ${typeof obj}`);
}
}
}
function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct {
if (typeof obj !== 'object' || obj === null) {
throw new Error('Invalid JSON object for Struct field');
}
const result = Object.keys(obj).map((key) => validateValue(key));
if (result.length === 1) {
return {
fields: result[0],
};
} else {
return {
fields: {
kind: 'listValue',
listValue: {
values: result,
},
},
};
}
}
/**
* Validate that the input obj is a valid Node proto message. Only checks the
* fields we expect to see: id, cluster, locality, and metadata.
* @param obj
*/
function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node {
const result: adsTypes.messages.envoy.api.v2.core.Node = {};
if (!('id' in obj)) {
throw new Error('id field missing in node element');
}
if (typeof obj.id !== 'string') {
throw new Error(`node.id field: expected string, got ${typeof obj.id}`);
}
result.id = obj.id;
if (!('cluster' in obj)) {
throw new Error('cluster field missing in node element');
}
if (typeof obj.cluster !== 'string') {
throw new Error(
`node.cluster field: expected string, got ${typeof obj.cluster}`
);
}
result.cluster = obj.cluster;
if (!('locality' in obj)) {
throw new Error('locality field missing in node element');
}
result.locality = {};
if ('region' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(
`node.locality.region field: expected string, got ${typeof obj.locality
.region}`
);
}
result.locality.region = obj.locality.region;
}
if ('zone' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(
`node.locality.zone field: expected string, got ${typeof obj.locality
.zone}`
);
}
result.locality.zone = obj.locality.zone;
}
if ('sub_zone' in obj.locality) {
if (typeof obj.locality.sub_zone !== 'string') {
throw new Error(
`node.locality.sub_zone field: expected string, got ${typeof obj
.locality.sub_zone}`
);
}
result.locality.sub_zone = obj.locality.sub_zone;
}
if ('metadata' in obj) {
result.metadata = getStructFromJson(obj.metadata);
}
return result;
}
function validateBootstrapFile(obj: any): BootstrapInfo {
return {
xdsServers: obj.xds_servers.map(validateXdsServerConfig),
node: validateNode(obj.node),
};
}
let loadedBootstrapInfo: Promise<BootstrapInfo> | null = null;
export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
if (loadedBootstrapInfo !== null) {
return loadedBootstrapInfo;
}
const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP;
if (bootstrapPath === undefined) {
return Promise.reject(
new Error(
'The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS'
)
);
}
loadedBootstrapInfo = new Promise((resolve, reject) => {
fs.readFile(bootstrapPath, { encoding: 'utf8' }, (err, data) => {
if (err) {
reject(
new Error(
`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${err.message}`
)
);
}
try {
const parsedFile = JSON.parse(data);
resolve(validateBootstrapFile(parsedFile));
} catch (e) {
reject(
new Error(
`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`
)
);
}
});
});
return loadedBootstrapInfo;
}

View File

@ -1,335 +1,399 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition } from './make-client';
import * as adsTypes from './generated/ads';
import * as edsTypes from './generated/endpoint';
import { createGoogleDefaultCredentials } from './channel-credentials';
import { loadBootstrapInfo } from './xds-bootstrap';
import { ClientDuplexStream, ServiceError } from './call';
import { StatusObject } from './call-stream';
import { isIPv4, isIPv6 } from 'net';
import { Status, LogVerbosity } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { ServiceConfig } from './service-config';
import { ChannelOptions } from './channel-options';
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const clientVersion = require('../../package.json').version;
const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
let loadedProtos: Promise<adsTypes.ProtoGrpcType> | null = null;
function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
if (loadedProtos !== null) {
return loadedProtos;
}
loadedProtos = protoLoader.load([
'envoy/service/discovery/v2/ads.proto',
'envoy/api/v2/listener.proto',
'envoy/api/v2/route.proto',
'envoy/api/v2/cluster.proto',
'envoy/api/v2/endpoint.proto'
], {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: [
'deps/envoy-api/',
'deps/udpa/',
'node_modules/protobufjs/',
'deps/googleapis/',
'deps/protoc-gen-validate/'
]
}).then(packageDefinition => loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType);
return loadedProtos;
}
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
export class XdsClient {
private node: adsTypes.messages.envoy.api.v2.core.Node | null = null;
private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<adsTypes.messages.envoy.api.v2.DiscoveryRequest, adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output> | null = null;
private hasShutdown: boolean = false;
private endpointWatchers: Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]> = new Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]>();
private lastEdsVersionInfo: string = '';
private lastEdsNonce: string = '';
constructor(private targetName: string, private serviceConfigWatcher: Watcher<ServiceConfig>, channelOptions: ChannelOptions) {
const channelArgs = {...channelOptions};
const channelArgsToRemove = [
/* The SSL target name override corresponds to the target, and this
* client has its own target */
'grpc.ssl_target_name_override',
/* The default authority also corresponds to the target */
'grpc.default_authority',
/* This client will have its own specific keepalive time setting */
'grpc.keepalive_time_ms',
/* The service config specifies the load balancing policy. This channel
* needs its own separate load balancing policy setting. In particular,
* recursively using an xDS load balancer for the xDS client would be
* bad */
'grpc.service_config'
];
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
}
channelArgs['grpc.keepalive_time_ms'] = 5000;
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
return;
}
this.node = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS'
}
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), channelArgs);
this.maybeStartAdsStream();
}, (error) => {
trace('Failed to initialize xDS Client. ' + error.message);
// Bubble this error up to any listeners
this.reportStreamError({
code: Status.INTERNAL,
details: `Failed to initialize xDS Client. ${error.message}`,
metadata: new Metadata()
});
});
}
/**
* Start the ADS stream if the client exists and there is not already an
* existing stream, and there
*/
private maybeStartAdsStream() {
if (this.client === null) {
return;
}
if (this.adsCall !== null) {
return;
}
if (this.hasShutdown) {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL:
const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (protoLoader.isAnyExtension(resource) && resource['@type'] === EDS_TYPE_URL) {
const resp = resource as protoLoader.AnyExtension & edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(`Invalid resource type ${protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url}`);
return;
}
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.ackEds();
break;
default:
this.nackUnknown(message.type_url, message.version_info, message.nonce);
}
});
this.adsCall.on('error', (error: ServiceError) => {
trace('ADS stream ended. code=' + error.code + ' details= ' + error.details);
this.adsCall = null;
this.reportStreamError(error);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames
});
}
}
private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
error_detail: {
message: `Unknown type_url ${typeUrl}`
}
});
}
/**
* Acknowledge an EDS update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
private ackEds() {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo
});
}
/**
* Reject an EDS update. This should be called without updating the local
* nonce and version info.
*/
private nackEds(message: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
error_detail: {
message
}
});
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
private handleEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output) {
const watchers = this.endpointWatchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo
});
}
}
private reportStreamError(status: StatusObject) {
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
// Also do the same for other types of watchers when those are implemented
}
addEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.endpointWatchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
if (addedServiceName) {
this.updateEdsNames();
}
}
removeEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.endpointWatchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateEdsNames();
}
}
shutdown(): void {
this.adsCall?.cancel();
this.client?.close();
this.hasShutdown = true;
}
}
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition } from './make-client';
import * as adsTypes from './generated/ads';
import * as edsTypes from './generated/endpoint';
import { createGoogleDefaultCredentials } from './channel-credentials';
import { loadBootstrapInfo } from './xds-bootstrap';
import { ClientDuplexStream, ServiceError } from './call';
import { StatusObject } from './call-stream';
import { isIPv4, isIPv6 } from 'net';
import { Status, LogVerbosity } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { ServiceConfig } from './service-config';
import { ChannelOptions } from './channel-options';
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const clientVersion = require('../../package.json').version;
const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
let loadedProtos: Promise<adsTypes.ProtoGrpcType> | null = null;
function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
if (loadedProtos !== null) {
return loadedProtos;
}
loadedProtos = protoLoader
.load(
[
'envoy/service/discovery/v2/ads.proto',
'envoy/api/v2/listener.proto',
'envoy/api/v2/route.proto',
'envoy/api/v2/cluster.proto',
'envoy/api/v2/endpoint.proto',
],
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: [
'deps/envoy-api/',
'deps/udpa/',
'node_modules/protobufjs/',
'deps/googleapis/',
'deps/protoc-gen-validate/',
],
}
)
.then(
(packageDefinition) =>
(loadPackageDefinition(
packageDefinition
) as unknown) as adsTypes.ProtoGrpcType
);
return loadedProtos;
}
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
export class XdsClient {
private node: adsTypes.messages.envoy.api.v2.core.Node | null = null;
private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<
adsTypes.messages.envoy.api.v2.DiscoveryRequest,
adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output
> | null = null;
private hasShutdown = false;
private endpointWatchers: Map<
string,
Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]
> = new Map<
string,
Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]
>();
private lastEdsVersionInfo = '';
private lastEdsNonce = '';
constructor(
private targetName: string,
private serviceConfigWatcher: Watcher<ServiceConfig>,
channelOptions: ChannelOptions
) {
const channelArgs = { ...channelOptions };
const channelArgsToRemove = [
/* The SSL target name override corresponds to the target, and this
* client has its own target */
'grpc.ssl_target_name_override',
/* The default authority also corresponds to the target */
'grpc.default_authority',
/* This client will have its own specific keepalive time setting */
'grpc.keepalive_time_ms',
/* The service config specifies the load balancing policy. This channel
* needs its own separate load balancing policy setting. In particular,
* recursively using an xDS load balancer for the xDS client would be
* bad */
'grpc.service_config',
];
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
}
channelArgs['grpc.keepalive_time_ms'] = 5000;
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(
([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
return;
}
this.node = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS',
};
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
channelArgs
);
this.maybeStartAdsStream();
},
(error) => {
trace('Failed to initialize xDS Client. ' + error.message);
// Bubble this error up to any listeners
this.reportStreamError({
code: Status.INTERNAL,
details: `Failed to initialize xDS Client. ${error.message}`,
metadata: new Metadata(),
});
}
);
}
/**
* Start the ADS stream if the client exists and there is not already an
* existing stream, and there
*/
private maybeStartAdsStream() {
if (this.client === null) {
return;
}
if (this.adsCall !== null) {
return;
}
if (this.hasShutdown) {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall.on(
'data',
(message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (
protoLoader.isAnyExtension(resource) &&
resource['@type'] === EDS_TYPE_URL
) {
const resp = resource as protoLoader.AnyExtension &
edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(
`Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
);
return;
}
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.ackEds();
break;
}
default:
this.nackUnknown(
message.type_url,
message.version_info,
message.nonce
);
}
}
);
this.adsCall.on('error', (error: ServiceError) => {
trace(
'ADS stream ended. code=' + error.code + ' details= ' + error.details
);
this.adsCall = null;
this.reportStreamError(error);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames,
});
}
}
private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
error_detail: {
message: `Unknown type_url ${typeUrl}`,
},
});
}
/**
* Acknowledge an EDS update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
private ackEds() {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
});
}
/**
* Reject an EDS update. This should be called without updating the local
* nonce and version info.
*/
private nackEds(message: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
error_detail: {
message,
},
});
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse(
message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output
): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
private handleEdsResponse(
message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output
) {
const watchers = this.endpointWatchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
});
}
}
private reportStreamError(status: StatusObject) {
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
// Also do the same for other types of watchers when those are implemented
}
addEndpointWatcher(
edsServiceName: string,
watcher: Watcher<
edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output
>
) {
trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.endpointWatchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
if (addedServiceName) {
this.updateEdsNames();
}
}
removeEndpointWatcher(
edsServiceName: string,
watcher: Watcher<
edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output
>
) {
trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.endpointWatchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateEdsNames();
}
}
shutdown(): void {
this.adsCall?.cancel();
this.client?.close();
this.hasShutdown = true;
}
}