mirror of
https://github.com/sofastack/sofa-rpc-node.git
synced 2025-12-08 20:26:00 +00:00
128 lines
3.6 KiB
JavaScript
128 lines
3.6 KiB
JavaScript
'use strict';
|
|
|
|
const debug = require('util').debuglog('rpc-client');
|
|
const assert = require('assert');
|
|
const Base = require('sdk-base');
|
|
const protocol = require('sofa-bolt-node');
|
|
const Scheduler = require('./scheduler');
|
|
const RpcConsumer = require('./consumer');
|
|
const RpcConnection = require('./connection/rpc');
|
|
const ConnectionManager = require('./connection_mgr');
|
|
|
|
const defaultOptions = {
|
|
group: 'SOFA',
|
|
version: '1.0',
|
|
responseTimeout: 3000,
|
|
consumerClass: RpcConsumer,
|
|
connectionClass: RpcConnection,
|
|
connectionManagerClass: ConnectionManager,
|
|
connectionOpts: {
|
|
protocol,
|
|
noDelay: true, // 默认禁用 Nagle 算法
|
|
connectTimeout: 3000, // 连接超时时长
|
|
},
|
|
};
|
|
|
|
class RpcClient extends Base {
|
|
constructor(options = {}) {
|
|
assert(options.logger, '[RpcClient] options.logger is required');
|
|
options = Object.assign({}, defaultOptions, options);
|
|
super(options);
|
|
|
|
if (options.protocol) this.options.connectionOpts.protocol = options.protocol;
|
|
|
|
// 确保一个服务只创建一个 consumer
|
|
this._consumerCache = new Map();
|
|
this.connectionManager = new options.connectionManagerClass(options);
|
|
this.connectionManager.on('error', err => { this.emit('error', err); });
|
|
|
|
// middlewares
|
|
this._middlewares = [];
|
|
// 立马 ready
|
|
this.ready(true);
|
|
}
|
|
|
|
close() {
|
|
Scheduler.instance.clear();
|
|
for (const consumer of this._consumerCache.values()) {
|
|
consumer.close();
|
|
}
|
|
this._consumerCache.clear();
|
|
return this.connectionManager.close();
|
|
}
|
|
|
|
get consumerMap() {
|
|
return this._consumerCache;
|
|
}
|
|
|
|
get consumerClass() {
|
|
return this.options.consumerClass;
|
|
}
|
|
|
|
set consumerClass(val) {
|
|
this.options.consumerClass = val;
|
|
}
|
|
|
|
use(mw) {
|
|
if (Array.isArray(mw)) {
|
|
this._middlewares = this._middlewares.concat(mw);
|
|
} else {
|
|
this._middlewares.push(mw);
|
|
}
|
|
for (const consumer of this.consumerMap.values()) {
|
|
consumer.use(mw);
|
|
}
|
|
}
|
|
|
|
// 通常只是在单元测试时使用
|
|
async invoke(opt) {
|
|
const consumer = this.createConsumer(opt);
|
|
const { methodName, args, options } = opt;
|
|
await consumer.ready();
|
|
return await consumer.invoke(methodName, args, options);
|
|
}
|
|
|
|
createConsumer(options, consumerClass) {
|
|
assert(typeof options.interfaceName === 'string', '[RpcClient] createConsumer(options) options.interfaceName is required and should be a string.');
|
|
options = Object.assign({
|
|
middlewares: this._middlewares,
|
|
connectionManager: this.connectionManager,
|
|
cache: true,
|
|
}, this.options, options);
|
|
const key = this.formatKey(options);
|
|
let consumer = this._consumerCache.get(key);
|
|
if (!options.cache || !consumer) {
|
|
debug('create consumer for %s', key);
|
|
consumerClass = consumerClass || this.consumerClass;
|
|
consumer = new consumerClass(options);
|
|
if (options.cache) {
|
|
this._consumerCache.set(key, consumer);
|
|
}
|
|
// delegate consumer's error to client
|
|
consumer.on('error', err => { this.emit('error', err); });
|
|
consumer.on('request', req => { this.emit('request', req); });
|
|
consumer.on('response', info => { this.emit('response', info); });
|
|
consumer.once('close', () => {
|
|
if (options.cache) {
|
|
this._consumerCache.delete(key);
|
|
}
|
|
});
|
|
}
|
|
return consumer;
|
|
}
|
|
|
|
formatKey(options) {
|
|
const { interfaceName, version, group, serverHost } = options;
|
|
let key = interfaceName + ':' + version + '@' + group;
|
|
if (serverHost) {
|
|
key += '@' + serverHost;
|
|
}
|
|
if (options.targetAppName) {
|
|
key += '@' + options.targetAppName;
|
|
}
|
|
return key;
|
|
}
|
|
}
|
|
|
|
module.exports = RpcClient;
|