sofa-rpc-node/lib/server/service.js

169 lines
4.5 KiB
JavaScript

'use strict';
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const { URL } = require('url');
class RpcService extends Base {
constructor(options = {}) {
assert(options.interfaceName, '[RpcService] options.interfaceName is required');
super(options);
this.appName = options.appName;
this.apiMeta = options.apiMeta;
this.delegate = options.delegate;
this.interfaceName = options.interfaceName;
this.version = options.version;
this.uniqueId = options.uniqueId;
this.group = options.group;
this.id = this.uniqueId ?
this.interfaceName + ':' + this.version + ':' + this.uniqueId :
this.interfaceName + ':' + this.version;
const { methods = [], classMaps } = this.apiMeta || {};
this.returnTypes = {};
this.classMaps = classMaps;
for (const method of methods) {
this.returnTypes[method.name] = {
$class: method.returnType,
generic: method.generic,
};
}
this.ready(true);
}
get app() {
return this.options.app;
}
get logger() {
return this.options.logger;
}
get registry() {
return this.options.registry;
}
normalizeReg(urlStr) {
const url = new URL(urlStr);
url.searchParams.set('interface', this.interfaceName);
url.searchParams.set('version', this.version);
url.searchParams.set('group', this.group);
const reg = {
interfaceName: this.interfaceName,
version: this.version,
group: this.group,
url: url.toString(),
};
return reg;
}
/**
* 发布到注册中心
*
* @param {String} url - 发布的 url
* @return {Promise} promise
*/
publish(url) {
if (!this.registry) return Promise.resolve();
const reg = this.normalizeReg(url);
this.publishUrl = reg.url;
return this.registry.register(reg);
}
/**
* 取消发布
* @return {void}
*/
unPublish() {
if (!this.registry || !this.publishUrl) return Promise.resolve();
const reg = this.normalizeReg(this.publishUrl);
return this.registry.unRegister(reg);
}
convertResult(req, result) {
// protobuf 不需要在这里转换
if (req.options.codecType === 'protobuf' || result == null) return result;
if (result instanceof Error) {
return {
$class: 'java.lang.Exception',
$: result,
};
}
const methodName = req.data.methodName;
const returnType = this.returnTypes[methodName];
// 如果 apiMeta 信息不足,或者 result 已经是 java 形式了,直接返回
if ((result.$class) || !returnType) {
return result;
}
return {
$class: returnType.$class,
$: result,
generic: returnType.generic,
};
}
async invoke(ctx, req, res) {
const methodName = req.data.methodName;
const args = req.data.args;
const timeout = req.options.timeout;
const data = {
isError: false,
errorMsg: null,
appResponse: null,
classMap: this.classMaps,
};
// 外部中间件可以在请求的时候, 拦截, 注入新的 method.
const method = req.method || this.delegate[methodName];
if (!is.asyncFunction(method)) {
res.meta.resultCode = '04'; // 路由失败
data.isError = true;
data.errorMsg = 'Can not find method: ' + this.id + '#' + methodName + '()';
} else {
let result;
try {
result = await method.apply(ctx, args);
} catch (err) {
// 如果框架自身抛出的异常, error name 可以定义为 SystemError, 方便甄别
// 这个异常直接放到协议头上
if (err.name === 'SystemError') {
res.meta.resultCode = err.resultCode || '02';
data.errorMsg = err.message;
} else {
// 业务异常
res.meta.resultCode = err.resultCode || '01';
}
// 都把 err 返回
result = err;
if (!err.ignoreLog) {
this.logger.error(err);
}
}
data.appResponse = result;
}
res.meta.rt = Date.now() - res.meta.start;
if (timeout && res.meta.rt >= timeout) {
this.logger.warn('[RpcService] service: %s#%s() process timeout, rt: %s, timeout: %s', this.id, methodName, res.meta.rt, timeout);
res.meta.resultCode = '03';
return;
}
if (res.isClosed) {
this.logger.warn('[RpcService] client maybe closed before sending response, remote address: %s', res.remoteAddress);
return;
}
// 类型转换 js => java
data.appResponse = this.convertResult(req, data.appResponse);
await res.send(data);
}
}
module.exports = RpcService;