2023-05-22 14:51:59 +08:00

128 lines
3.6 KiB
JavaScript

'use strict';
const debug = require('util').debuglog('rpc-client');
const assert = require('assert');
const Base = require('sdk-base');
const protocol = require('sofa-bolt-node');
const Scheduler = require('./scheduler');
const RpcConsumer = require('./consumer');
const RpcConnection = require('./connection/rpc');
const ConnectionManager = require('./connection_mgr');
const defaultOptions = {
group: 'SOFA',
version: '1.0',
responseTimeout: 3000,
consumerClass: RpcConsumer,
connectionClass: RpcConnection,
connectionManagerClass: ConnectionManager,
connectionOpts: {
protocol,
noDelay: true, // 默认禁用 Nagle 算法
connectTimeout: 3000, // 连接超时时长
},
};
class RpcClient extends Base {
constructor(options = {}) {
assert(options.logger, '[RpcClient] options.logger is required');
options = Object.assign({}, defaultOptions, options);
super(options);
if (options.protocol) this.options.connectionOpts.protocol = options.protocol;
// 确保一个服务只创建一个 consumer
this._consumerCache = new Map();
this.connectionManager = new options.connectionManagerClass(options);
this.connectionManager.on('error', err => { this.emit('error', err); });
// middlewares
this._middlewares = [];
// 立马 ready
this.ready(true);
}
close() {
Scheduler.instance.clear();
for (const consumer of this._consumerCache.values()) {
consumer.close();
}
this._consumerCache.clear();
return this.connectionManager.close();
}
get consumerMap() {
return this._consumerCache;
}
get consumerClass() {
return this.options.consumerClass;
}
set consumerClass(val) {
this.options.consumerClass = val;
}
use(mw) {
if (Array.isArray(mw)) {
this._middlewares = this._middlewares.concat(mw);
} else {
this._middlewares.push(mw);
}
for (const consumer of this.consumerMap.values()) {
consumer.use(mw);
}
}
// 通常只是在单元测试时使用
async invoke(opt) {
const consumer = this.createConsumer(opt);
const { methodName, args, options } = opt;
await consumer.ready();
return await consumer.invoke(methodName, args, options);
}
createConsumer(options, consumerClass) {
assert(typeof options.interfaceName === 'string', '[RpcClient] createConsumer(options) options.interfaceName is required and should be a string.');
options = Object.assign({
middlewares: this._middlewares,
connectionManager: this.connectionManager,
cache: true,
}, this.options, options);
const key = this.formatKey(options);
let consumer = this._consumerCache.get(key);
if (!options.cache || !consumer) {
debug('create consumer for %s', key);
consumerClass = consumerClass || this.consumerClass;
consumer = new consumerClass(options);
if (options.cache) {
this._consumerCache.set(key, consumer);
}
// delegate consumer's error to client
consumer.on('error', err => { this.emit('error', err); });
consumer.on('request', req => { this.emit('request', req); });
consumer.on('response', info => { this.emit('response', info); });
consumer.once('close', () => {
if (options.cache) {
this._consumerCache.delete(key);
}
});
}
return consumer;
}
formatKey(options) {
const { interfaceName, version, group, serverHost } = options;
let key = interfaceName + ':' + version + '@' + group;
if (serverHost) {
key += '@' + serverHost;
}
if (options.targetAppName) {
key += '@' + options.targetAppName;
}
return key;
}
}
module.exports = RpcClient;