mirror of
https://github.com/sofastack/sofa-rpc-node.git
synced 2025-12-08 20:26:00 +00:00
* egg的rpc.server支持metaKey配置,用于支持自定义元数据; Co-authored-by: Changeden <chenzhiduan@unizone.tech>
200 lines
5.7 KiB
JavaScript
200 lines
5.7 KiB
JavaScript
'use strict';
|
|
|
|
const is = require('is-type-of');
|
|
const Base = require('sdk-base');
|
|
const assert = require('assert');
|
|
const { URL } = require('url');
|
|
|
|
class RpcService extends Base {
|
|
constructor(options = {}) {
|
|
assert(options.interfaceName, '[RpcService] options.interfaceName is required');
|
|
super(options);
|
|
|
|
this.appName = options.appName;
|
|
this.apiMeta = options.apiMeta;
|
|
this.delegate = options.delegate;
|
|
this.interfaceName = options.interfaceName;
|
|
this.version = options.version;
|
|
this.uniqueId = options.uniqueId;
|
|
this.group = options.group;
|
|
// 获取自定义元数据
|
|
// get custom metadata
|
|
this.customMeta = typeof options.customMeta === 'object' ? options.customMeta : {};
|
|
|
|
this.id = this.uniqueId ?
|
|
this.interfaceName + ':' + this.version + ':' + this.uniqueId :
|
|
this.interfaceName + ':' + this.version;
|
|
|
|
const { methods = [], classMaps } = this.apiMeta || {};
|
|
this.returnTypes = {};
|
|
this.classMaps = classMaps;
|
|
|
|
for (const method of methods) {
|
|
this.returnTypes[method.name] = {
|
|
$class: method.returnType,
|
|
generic: method.generic,
|
|
};
|
|
}
|
|
this.ready(true);
|
|
}
|
|
|
|
get app() {
|
|
return this.options.app;
|
|
}
|
|
|
|
get logger() {
|
|
return this.options.logger;
|
|
}
|
|
|
|
get registry() {
|
|
return this.options.registry;
|
|
}
|
|
|
|
normalizeReg(urlStr) {
|
|
const url = new URL(urlStr);
|
|
url.searchParams.set('interface', this.interfaceName);
|
|
url.searchParams.set('version', this.version);
|
|
url.searchParams.set('group', this.group);
|
|
// 写入自定义元数据
|
|
// append custom metadata to searchParams
|
|
let customMeta = {};
|
|
const blockList = [ 'interface', 'version', 'group' ];
|
|
Object.keys(this.customMeta).forEach(metaKey => {
|
|
const value = this.customMeta[metaKey];
|
|
if (!(typeof value === 'undefined' || blockList.includes(metaKey))) {
|
|
url.searchParams.set(metaKey, value);
|
|
const source = { [`${metaKey}`]: value };
|
|
customMeta = Object.assign(customMeta, source);
|
|
}
|
|
});
|
|
const reg = {
|
|
interfaceName: this.interfaceName,
|
|
version: this.version,
|
|
group: this.group,
|
|
url: url.toString(),
|
|
};
|
|
return Object.assign(reg, customMeta);
|
|
}
|
|
|
|
/**
|
|
* 发布到注册中心
|
|
*
|
|
* @param {String} url - 发布的 url
|
|
* @return {Promise} promise
|
|
*/
|
|
publish(url) {
|
|
if (!this.registry) return Promise.resolve();
|
|
|
|
const reg = this.normalizeReg(url);
|
|
this.publishUrl = reg.url;
|
|
return this.registry.register(reg)
|
|
.then(() => {
|
|
return new Promise(resolve => {
|
|
const listener = addressList => {
|
|
const exists = addressList.some(addr => {
|
|
return new URL(addr).href.includes(url);
|
|
});
|
|
if (exists) {
|
|
this.registry.unSubscribe(reg, listener);
|
|
resolve();
|
|
}
|
|
};
|
|
|
|
this.registry.subscribe(reg, listener);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 取消发布
|
|
* @return {void}
|
|
*/
|
|
unPublish() {
|
|
if (!this.registry || !this.publishUrl) return Promise.resolve();
|
|
|
|
const reg = this.normalizeReg(this.publishUrl);
|
|
return this.registry.unRegister(reg);
|
|
}
|
|
|
|
convertResult(req, result) {
|
|
// protobuf 不需要在这里转换
|
|
if (req.options.codecType === 'protobuf' || result == null) return result;
|
|
|
|
if (result instanceof Error) {
|
|
return {
|
|
$class: 'java.lang.Exception',
|
|
$: result,
|
|
};
|
|
}
|
|
const methodName = req.data.methodName;
|
|
const returnType = this.returnTypes[methodName];
|
|
// 如果 apiMeta 信息不足,或者 result 已经是 java 形式了,直接返回
|
|
if ((result.$class) || !returnType) {
|
|
return result;
|
|
}
|
|
return {
|
|
$class: returnType.$class,
|
|
$: result,
|
|
generic: returnType.generic,
|
|
};
|
|
}
|
|
|
|
async invoke(ctx, req, res) {
|
|
const methodName = req.data.methodName;
|
|
const args = req.data.args;
|
|
const timeout = req.options.timeout;
|
|
const data = {
|
|
isError: false,
|
|
errorMsg: null,
|
|
appResponse: null,
|
|
classMap: this.classMaps,
|
|
};
|
|
// 外部中间件可以在请求的时候, 拦截, 注入新的 method.
|
|
const method = req.method || this.delegate[methodName];
|
|
if (!is.asyncFunction(method)) {
|
|
res.meta.resultCode = '04'; // 路由失败
|
|
data.isError = true;
|
|
data.errorMsg = 'Can not find method: ' + this.id + '#' + methodName + '()';
|
|
} else {
|
|
let result;
|
|
try {
|
|
result = await method.apply(ctx, args);
|
|
} catch (err) {
|
|
data.isError = true;
|
|
data.errorMsg = err.message;
|
|
// 如果框架自身抛出的异常, error name 可以定义为 SystemError, 方便甄别
|
|
// 这个异常直接放到协议头上
|
|
if (err.name === 'SystemError') {
|
|
res.meta.resultCode = err.resultCode || '02';
|
|
} else {
|
|
// 业务异常
|
|
res.meta.resultCode = err.resultCode || '01';
|
|
}
|
|
// 都把 err 返回
|
|
result = err;
|
|
if (!err.ignoreLog) {
|
|
this.logger.error(err);
|
|
}
|
|
}
|
|
data.appResponse = result;
|
|
}
|
|
res.meta.rt = Date.now() - res.meta.start;
|
|
|
|
if (timeout && res.meta.rt >= timeout) {
|
|
this.logger.warn('[RpcService] service: %s#%s() process timeout, rt: %s, timeout: %s', this.id, methodName, res.meta.rt, timeout);
|
|
res.meta.resultCode = '03';
|
|
return;
|
|
}
|
|
if (res.isClosed) {
|
|
this.logger.warn('[RpcService] client maybe closed before sending response, remote address: %s', res.remoteAddress);
|
|
return;
|
|
}
|
|
|
|
// 类型转换 js => java
|
|
data.appResponse = this.convertResult(req, data.appResponse);
|
|
await res.send(data);
|
|
}
|
|
}
|
|
|
|
module.exports = RpcService;
|