This commit is contained in:
Michael Lumish 2020-07-13 13:11:54 -07:00
parent f91c837058
commit a0b050aa0c

View File

@ -102,10 +102,7 @@ export class XdsClient {
private endpointWatchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
>();
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private lastEdsVersionInfo = '';
private lastEdsNonce = '';
@ -177,52 +174,49 @@ export class XdsClient {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall.on(
'data',
(message: DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
const edsResponses: ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (
protoLoader.isAnyExtension(resource) &&
resource['@type'] === EDS_TYPE_URL
) {
const resp = resource as protoLoader.AnyExtension &
ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(
`Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
);
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
const edsResponses: ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (
protoLoader.isAnyExtension(resource) &&
resource['@type'] === EDS_TYPE_URL
) {
const resp = resource as protoLoader.AnyExtension &
ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(
`Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
);
return;
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.ackEds();
break;
}
default:
this.nackUnknown(
message.type_url,
message.version_info,
message.nonce
);
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.ackEds();
break;
}
default:
this.nackUnknown(
message.type_url,
message.version_info,
message.nonce
);
}
);
});
this.adsCall.on('error', (error: ServiceError) => {
trace(
'ADS stream ended. code=' + error.code + ' details= ' + error.details
@ -301,9 +295,7 @@ export class XdsClient {
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse(
message: ClusterLoadAssignment__Output
): boolean {
private validateEdsResponse(message: ClusterLoadAssignment__Output): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
@ -321,9 +313,7 @@ export class XdsClient {
return true;
}
private handleEdsResponse(
message: ClusterLoadAssignment__Output
) {
private handleEdsResponse(message: ClusterLoadAssignment__Output) {
const watchers = this.endpointWatchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
@ -353,9 +343,7 @@ export class XdsClient {
addEndpointWatcher(
edsServiceName: string,
watcher: Watcher<
ClusterLoadAssignment__Output
>
watcher: Watcher<ClusterLoadAssignment__Output>
) {
trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
@ -373,9 +361,7 @@ export class XdsClient {
removeEndpointWatcher(
edsServiceName: string,
watcher: Watcher<
ClusterLoadAssignment__Output
>
watcher: Watcher<ClusterLoadAssignment__Output>
) {
trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);