sofa-rpc-node/lib/server/service.js
ChangedenChan 1560625421
egg的rpc.server支持customMeta配置,用于支持自定义元数据; (#67)
* egg的rpc.server支持metaKey配置,用于支持自定义元数据;

Co-authored-by: Changeden <chenzhiduan@unizone.tech>
2020-06-15 09:14:33 +08:00

200 lines
5.7 KiB
JavaScript

'use strict';
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const { URL } = require('url');
class RpcService extends Base {
constructor(options = {}) {
assert(options.interfaceName, '[RpcService] options.interfaceName is required');
super(options);
this.appName = options.appName;
this.apiMeta = options.apiMeta;
this.delegate = options.delegate;
this.interfaceName = options.interfaceName;
this.version = options.version;
this.uniqueId = options.uniqueId;
this.group = options.group;
// 获取自定义元数据
// get custom metadata
this.customMeta = typeof options.customMeta === 'object' ? options.customMeta : {};
this.id = this.uniqueId ?
this.interfaceName + ':' + this.version + ':' + this.uniqueId :
this.interfaceName + ':' + this.version;
const { methods = [], classMaps } = this.apiMeta || {};
this.returnTypes = {};
this.classMaps = classMaps;
for (const method of methods) {
this.returnTypes[method.name] = {
$class: method.returnType,
generic: method.generic,
};
}
this.ready(true);
}
get app() {
return this.options.app;
}
get logger() {
return this.options.logger;
}
get registry() {
return this.options.registry;
}
normalizeReg(urlStr) {
const url = new URL(urlStr);
url.searchParams.set('interface', this.interfaceName);
url.searchParams.set('version', this.version);
url.searchParams.set('group', this.group);
// 写入自定义元数据
// append custom metadata to searchParams
let customMeta = {};
const blockList = [ 'interface', 'version', 'group' ];
Object.keys(this.customMeta).forEach(metaKey => {
const value = this.customMeta[metaKey];
if (!(typeof value === 'undefined' || blockList.includes(metaKey))) {
url.searchParams.set(metaKey, value);
const source = { [`${metaKey}`]: value };
customMeta = Object.assign(customMeta, source);
}
});
const reg = {
interfaceName: this.interfaceName,
version: this.version,
group: this.group,
url: url.toString(),
};
return Object.assign(reg, customMeta);
}
/**
* 发布到注册中心
*
* @param {String} url - 发布的 url
* @return {Promise} promise
*/
publish(url) {
if (!this.registry) return Promise.resolve();
const reg = this.normalizeReg(url);
this.publishUrl = reg.url;
return this.registry.register(reg)
.then(() => {
return new Promise(resolve => {
const listener = addressList => {
const exists = addressList.some(addr => {
return new URL(addr).href.includes(url);
});
if (exists) {
this.registry.unSubscribe(reg, listener);
resolve();
}
};
this.registry.subscribe(reg, listener);
});
});
}
/**
* 取消发布
* @return {void}
*/
unPublish() {
if (!this.registry || !this.publishUrl) return Promise.resolve();
const reg = this.normalizeReg(this.publishUrl);
return this.registry.unRegister(reg);
}
convertResult(req, result) {
// protobuf 不需要在这里转换
if (req.options.codecType === 'protobuf' || result == null) return result;
if (result instanceof Error) {
return {
$class: 'java.lang.Exception',
$: result,
};
}
const methodName = req.data.methodName;
const returnType = this.returnTypes[methodName];
// 如果 apiMeta 信息不足,或者 result 已经是 java 形式了,直接返回
if ((result.$class) || !returnType) {
return result;
}
return {
$class: returnType.$class,
$: result,
generic: returnType.generic,
};
}
async invoke(ctx, req, res) {
const methodName = req.data.methodName;
const args = req.data.args;
const timeout = req.options.timeout;
const data = {
isError: false,
errorMsg: null,
appResponse: null,
classMap: this.classMaps,
};
// 外部中间件可以在请求的时候, 拦截, 注入新的 method.
const method = req.method || this.delegate[methodName];
if (!is.asyncFunction(method)) {
res.meta.resultCode = '04'; // 路由失败
data.isError = true;
data.errorMsg = 'Can not find method: ' + this.id + '#' + methodName + '()';
} else {
let result;
try {
result = await method.apply(ctx, args);
} catch (err) {
data.isError = true;
data.errorMsg = err.message;
// 如果框架自身抛出的异常, error name 可以定义为 SystemError, 方便甄别
// 这个异常直接放到协议头上
if (err.name === 'SystemError') {
res.meta.resultCode = err.resultCode || '02';
} else {
// 业务异常
res.meta.resultCode = err.resultCode || '01';
}
// 都把 err 返回
result = err;
if (!err.ignoreLog) {
this.logger.error(err);
}
}
data.appResponse = result;
}
res.meta.rt = Date.now() - res.meta.start;
if (timeout && res.meta.rt >= timeout) {
this.logger.warn('[RpcService] service: %s#%s() process timeout, rt: %s, timeout: %s', this.id, methodName, res.meta.rt, timeout);
res.meta.resultCode = '03';
return;
}
if (res.isClosed) {
this.logger.warn('[RpcService] client maybe closed before sending response, remote address: %s', res.remoteAddress);
return;
}
// 类型转换 js => java
data.appResponse = this.convertResult(req, data.appResponse);
await res.send(data);
}
}
module.exports = RpcService;