From 5759b70639e4e8ca74e529c7b7bbde8c2cf02318 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 27 Jul 2023 11:11:10 -0700 Subject: [PATCH] Fix some issues with the benchmark code --- test/any_grpc.js | 2 +- test/package.json | 4 +- test/performance/benchmark_server.js | 3 +- test/performance/driver.js | 82 ++++++++++++++++++++++++++++ test/performance/worker.js | 4 +- 5 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 test/performance/driver.js diff --git a/test/any_grpc.js b/test/any_grpc.js index 2f161ff7..ec0b2b74 100644 --- a/test/any_grpc.js +++ b/test/any_grpc.js @@ -25,7 +25,7 @@ function getImplementation(globalField) { const impl = global[globalField]; if (impl === 'js') { - return require(`../packages/grpc-${impl}`); + return require('../packages/grpc-js'); } else if (impl === 'native') { return require('grpc'); } diff --git a/test/package.json b/test/package.json index 4f42f7bf..963093bc 100644 --- a/test/package.json +++ b/test/package.json @@ -16,8 +16,10 @@ "dependencies": { "express": "^4.16.3", "google-auth-library": "^6.1.0", - "grpc": "^1.24.2", "lodash": "^4.17.4", "poisson-process": "^1.0.0" + }, + "optionalDependencies": { + "grpc": "^1.24.2" } } diff --git a/test/performance/benchmark_server.js b/test/performance/benchmark_server.js index 64128b9d..0d6c7178 100644 --- a/test/performance/benchmark_server.js +++ b/test/performance/benchmark_server.js @@ -154,8 +154,9 @@ util.inherits(BenchmarkServer, EventEmitter); * Start the benchmark server. */ BenchmarkServer.prototype.start = function() { - this.server.bindAsync(this.host + ':' + this.port, this.creds, (err) => { + this.server.bindAsync(this.host + ':' + this.port, this.creds, (err, port) => { assert.ifError(err); + this.port = port; this.server.start(); this.last_wall_time = process.hrtime(); this.last_usage = process.cpuUsage(); diff --git a/test/performance/driver.js b/test/performance/driver.js new file mode 100644 index 00000000..fa0e8756 --- /dev/null +++ b/test/performance/driver.js @@ -0,0 +1,82 @@ +const grpc = require('../any_grpc').server; +const protoLoader = require('../../packages/proto-loader'); +const protoPackage = protoLoader.loadSync( + 'src/proto/grpc/testing/worker_service.proto', + {keepCase: true, + defaults: true, + enums: String, + oneofs: true, + includeDirs: [__dirname + '/../proto/']}); +const serviceProto = grpc.loadPackageDefinition(protoPackage).grpc.testing; + +function main() { + const parseArgs = require('minimist'); + const argv = parseArgs(process.argv, { + string: ['client_worker_port', 'server_worker_port'] + }); + const clientWorker = new serviceProto.WorkerService(`localhost:${argv.client_worker_port}`, grpc.credentials.createInsecure()); + const serverWorker = new serviceProto.WorkerService(`localhost:${argv.server_worker_port}`, grpc.credentials.createInsecure()); + const serverWorkerStream = serverWorker.runServer(); + const clientWorkerStream = clientWorker.runClient(); + let firstServerResponseReceived = false; + let markCount = 0; + serverWorkerStream.on('data', (response) => { + console.log('Server stats:', response.stats); + if (!firstServerResponseReceived) { + firstServerResponseReceived = true; + clientWorkerStream.write({ + setup: { + server_targets: [`localhost:${response.port}`], + client_channels: 1, + outstanding_rpcs_per_channel: 1, + histogram_params: { + resolution: 0.01, + max_possible:60000000000 + }, + payload_config: { + bytebuf_params: { + req_size: 10, + resp_size: 10 + } + }, + load_params: { + closed_loop: {} + } + } + }); + clientWorkerStream.on('status', (status) => { + console.log('Received client worker status ' + JSON.stringify(status)); + serverWorkerStream.end(); + }); + const markInterval = setInterval(() => { + if (markCount >= 5) { + clientWorkerStream.end(); + clearInterval(markInterval); + } else { + clientWorkerStream.write({ + mark: {} + }); + serverWorkerStream.write({ + mark: {} + }); + } + markCount += 1; + }, 1000); + } + }); + clientWorkerStream.on('data', (response) => { + console.log('Client stats:', response.stats); + }); + serverWorkerStream.write({ + setup: { + port: 0 + } + }); + serverWorkerStream.on('status', (status) => { + console.log('Received server worker status ' + JSON.stringify(status)); + }); +} + +if (require.main === module) { + main(); +} diff --git a/test/performance/worker.js b/test/performance/worker.js index 86f17df2..786d7056 100644 --- a/test/performance/worker.js +++ b/test/performance/worker.js @@ -39,13 +39,13 @@ function runServer(port, benchmark_impl, callback) { server.addService(serviceProto.WorkerService.service, new WorkerServiceImpl(benchmark_impl, server)); var address = '0.0.0.0:' + port; - server.bindAsync(address, server_creds, (err) => { + server.bindAsync(address, server_creds, (err, port) => { if (err) { return callback(err); } server.start(); - console.log('running QPS worker on %s', address); + console.log('running QPS worker on 0.0.0.0:%s', port); callback(null, server); }); }