grpc-js-xds: interop server: Add support for sending custom metrics

This commit is contained in:
Michael Lumish 2025-09-10 11:15:23 -07:00
parent a7dfb681b1
commit f2d1776b74

View File

@ -232,12 +232,36 @@ function getIPv6Addresses(): string[] {
return ipv6Addresses;
}
interface ConfiguredMetrics {
qps?: number;
applicationUtilization?: number;
eps?: number;
}
function createInBandMetricsInterceptor(metrics: ConfiguredMetrics) {
return function inBandMetricsInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
const metricsRecorder = call.getMetricsRecorder()
if (metrics.qps) {
metricsRecorder.recordQpsMetric(metrics.qps);
}
if (metrics.applicationUtilization) {
metricsRecorder.recordApplicationUtilizationMetric(metrics.applicationUtilization);
}
if (metrics.eps) {
metricsRecorder.recordEpsMetric(metrics.eps);
}
return new grpc.ServerInterceptingCall(call);
}
}
async function main() {
const argv = yargs
.string(['port', 'maintenance_port', 'address_type', 'secure_mode'])
.string(['port', 'maintenance_port', 'address_type', 'secure_mode', 'metrics_mode'])
.number(['qps', 'application_utilization', 'eps'])
.demandOption(['port'])
.default('address_type', 'IPV4_IPV6')
.default('secure_mode', 'false')
.default('metrics_mode', 'NONE')
.parse()
console.log('Starting xDS interop server. Args: ', argv);
const healthImpl = new HealthImplementation({'': 'NOT_SERVING'});
@ -253,7 +277,28 @@ async function main() {
}
const reflection = new ReflectionService(packageDefinition, {
services: ['grpc.testing.TestService']
})
});
let metricInterceptor: grpc.ServerInterceptor | null = null;
const metricsMode = argv.metrics_mode.toUpperCase();
let metricRecorder: grpc.ServerMetricRecorder | null = null;
if (metricsMode === 'IN_BAND') {
metricInterceptor = createInBandMetricsInterceptor({
qps: argv.qps,
applicationUtilization: argv.application_utilization,
eps: argv.eps
});
} else if (metricsMode === 'OUT_OF_BAND') {
metricRecorder = new grpc.ServerMetricRecorder();
if (argv.qps) {
metricRecorder.setQpsMetric(argv.qps);
}
if (argv.application_utilization) {
metricRecorder.setApplicationUtilizationMetric(argv.application_utilization);
}
if (argv.eps) {
metricRecorder.setEpsMetric(argv.eps);
}
}
const addressType = argv.address_type.toUpperCase();
const secureMode = argv.secure_mode.toLowerCase() == 'true';
if (secureMode) {
@ -266,19 +311,33 @@ async function main() {
reflection.addToServer(maintenanceServer);
grpc.addAdminServicesToServer(maintenanceServer);
const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]});
const interceptorList = [testInfoInterceptor];
if (metricInterceptor) {
interceptorList.push(metricInterceptor);
}
const server = new grpc_xds.XdsServer({interceptors: interceptorList});
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
if (metricRecorder) {
metricRecorder.addToServer(server);
}
const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure());
await Promise.all([
serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()),
serverBindPromise(server, `0.0.0.0:${argv.port}`, xdsCreds)
]);
} else {
const interceptorList = [unifiedInterceptor];
if (metricInterceptor) {
interceptorList.push(metricInterceptor);
}
const server = new grpc.Server({interceptors: [unifiedInterceptor]});
server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl);
healthImpl.addToServer(server);
reflection.addToServer(server);
grpc.addAdminServicesToServer(server);
if (metricRecorder) {
metricRecorder.addToServer(server);
}
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
const creds = grpc.ServerCredentials.createInsecure();
switch (addressType) {