2022-03-17 22:19:31 +08:00

306 lines
8.6 KiB
JavaScript

'use strict';
const net = require('net');
const assert = require('assert');
const Base = require('sdk-base');
const qs = require('querystring');
const graceful = require('graceful');
const awaitFirst = require('await-first');
const RpcService = require('./service');
const RpcResponse = require('./response');
const RpcConnection = require('./connection');
const RpcClient = require('../client').RpcClient;
const localIp = require('address').ip();
const _testClient = Symbol.for('RpcServer#testClient');
const defaultOptions = {
port: 12200,
group: 'SOFA',
version: '1.0',
protocol: require('sofa-bolt-node'),
serviceClass: RpcService,
responseClass: RpcResponse,
connectionClass: RpcConnection,
idleTime: 5000,
killTimeout: 30000,
maxIdleTime: 90 * 1000,
responseTimeout: 3000,
codecType: 'hessian2',
classCacheClass: Map,
};
class RpcServer extends Base {
/**
* Rpc 服务提供方
*
* @param {Object} options
* - {String} appName - 应用名称
* - {Registry} registry - 配置中心客户端,用于发布服务地址
* - {Logger} logger - 日志对象,推荐使用 egg-logger 实例
* - {String} [group=SOFA] - 默认分组
* - {String} [codecType=hessian2] - 序列化协议类型,支持 hessian2 / protobuf 等
* - {Protocol} [protocol=require('sofa-bolt-node')] - rpc 通讯协议具体实现
* - {Number} [idleTime=5000] - 客户端心跳间隔
* - {Number} [maxIdleTime=30000] - 客户端最大空闲间隔,超过该间隔没有收到数据,则断开连接
* @class
*/
constructor(options) {
assert(options.logger, '[RpcServer] options.logger is required');
super(Object.assign({}, defaultOptions, options));
this._started = false;
this._localIp = localIp;
this._servers = [];
this._services = new Map(); // <id, Service>
this._connections = new Map(); // <address, Connection>
this.classMap = options.classMap;
this.proto = options.proto;
this.publishAddress = this.options.publishAddress || this._localIp;
this.publishPort = this.options.port;
this.serviceClass = this.options.serviceClass;
this.responseClass = this.options.responseClass;
this.connectionClass = this.options.connectionClass;
}
// 给单元测试用
get testClient() {
if (!this[_testClient]) {
this[_testClient] = new RpcClient({
logger: this.logger,
connectionOpts: {
proto: this.proto,
classMap: this.classMap,
protocol: this.protocol,
codecType: this.options.codecType,
},
});
}
return this[_testClient];
}
get services() {
return this._services;
}
get protocol() {
return this.options.protocol;
}
get listenPorts() {
return [ this.publishPort ];
}
get logger() {
return this.options.logger;
}
get registry() {
return this.options.registry;
}
get url() {
const type = this.protocol.name || 'bolt';
// uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400
return type + '://' + this.publishAddress + ':' + this.publishPort + '?' + qs.stringify(this.params);
}
get params() {
const obj = {
startTime: Date.now(),
pid: process.pid,
uniqueId: '',
dynamic: 'true',
appName: this.options.appName,
timeout: this.options.responseTimeout,
serialization: this.options.codecType,
weight: 100,
accepts: 100000,
language: 'nodejs',
rpcVer: '50400',
protocol: this.protocol.name,
};
return obj;
}
// https://nodejs.org/api/net.html#net_server_getconnections_callback
getConnections(callback) {
if (callback) {
callback(null, this._connections.size);
}
}
/**
* Adds a service.
*
* @param {Object} info The information
* @param {Object} delegate The delegate
* @return {void}
*/
addService(info, delegate) {
if (typeof info === 'string') {
info = { interfaceName: info };
}
const service = new this.serviceClass(Object.assign({
registry: this.registry,
logger: this.logger,
group: this.options.group,
version: this.options.version,
delegate,
}, info));
service.on('error', err => { this.emit('error', err); });
if (this._services.has(service.id)) {
this.logger.warn('[RpcServer] service: %s already added, will override it', service.id);
}
this._services.set(service.id, service);
}
_startServer(port) {
const server = net.createServer();
server.once('error', err => { this.emit('error', err); });
server.on('connection', socket => { this._handleSocket(socket); });
server.listen(port, () => {
const realPort = server.address().port;
if (port === this.publishPort && port === 0) {
this.publishPort = realPort;
}
this.logger.info('[RpcServer] server start on %s', realPort);
});
return server;
}
/**
* Start the rpc server
*
* @return {Promise} promise
*/
async start() {
if (!this._started) {
this._started = true;
for (const port of this.listenPorts) {
const server = this._startServer(port);
this._servers.push(server);
}
graceful({
killTimeout: this.options.killTimeout,
server: this._servers,
error: () => {
this._handleUncaughtError();
},
});
Promise.all(this._servers.map(server => awaitFirst(server, [ 'listening', 'error' ])))
.then(() => {
this.emit('listening');
this.ready(true);
}, err => {
this.ready(err);
});
}
return this.ready();
}
publish() {
const tasks = [];
for (const service of this._services.values()) {
tasks.push(service.publish(this.url));
}
return Promise.all(tasks);
}
unPublish() {
const tasks = [];
for (const service of this._services.values()) {
tasks.push(service.unPublish());
}
return Promise.all(tasks);
}
async close() {
// 1. 取消服务注册
await this.unPublish();
// 2. 关闭 tcp server
for (const server of this._servers) {
server.close();
}
// 3. 强制关闭连接
const closeTasks = [];
for (const connection of this._connections.values()) {
closeTasks.push(connection.close());
}
await Promise.all(closeTasks);
this.emit('close');
if (this.testClient) {
await this.testClient.close();
}
}
_handleSocket(socket) {
const options = {
socket,
protocol: this.protocol,
logger: this.logger,
proto: this.proto,
classMap: this.classMap,
maxIdleTime: this.options.maxIdleTime,
disableDecodeCache: this.options.disableDecodeCache,
};
if (this.options.classCacheClass) {
// 每一个 connection 实例化一个 classCache
options.classCache = new this.options.classCacheClass();
}
const conn = new this.connectionClass(options);
const key = conn.remoteAddress;
this._connections.set(key, conn);
conn.on('request', req => {
this._handleRequest(req, conn).catch(err => {
const { packetId, packetType, className, options, meta, ctx } = req;
const { targetAppName, methodName, serverSignature, interfaceName, methodArgSigs, requestProps } = req.data || {};
err.req = {
packetId, packetType, className, options, meta, ctx,
data: { targetAppName, methodName, serverSignature, interfaceName, methodArgSigs, requestProps },
};
this.emit('error', err);
});
});
conn.once('close', () => { this._connections.delete(key); });
this.emit('connection', conn);
}
createContext(/* req, res*/) {
return null;
}
async _handleRequest(req, conn) {
const id = req.data.serverSignature;
req.data.interfaceName = req.data.interfaceName || req.data.serverSignature.split(':')[0];
const service = this._services.get(id);
const res = new this.responseClass(req, conn);
const ctx = this.createContext(req, res);
this.emit('request', { req, ctx });
try {
if (!service) {
throw new Error('not found service: ' + id);
}
await service.invoke(ctx, req, res);
} catch (e) {
this.emit('error', e);
res.meta.resultCode = '02';
await res.send({
isError: true,
errorMsg: e.message,
appResponse: null,
});
} finally {
this.emit('response', { ctx, req, res });
}
}
_handleUncaughtError() {
this.logger.warn('[RpcServer] server is down, cause by uncaughtException in this process %s', process.pid);
this.unPublish();
}
}
module.exports = RpcServer;