mirror of
https://github.com/sofastack/sofa-rpc-node.git
synced 2026-01-18 15:55:54 +00:00
315 lines
9.0 KiB
JavaScript
315 lines
9.0 KiB
JavaScript
'use strict';
|
|
|
|
const net = require('net');
|
|
const pump = require('pump');
|
|
const assert = require('assert');
|
|
const Base = require('sdk-base');
|
|
const utils = require('../utils');
|
|
const protocol = require('sofa-bolt-node');
|
|
const CircuitBreaker = require('../circuit_breaker');
|
|
|
|
const defaultOptions = {
|
|
protocol,
|
|
noDelay: true, // 默认禁用 Nagle 算法
|
|
maxIdleTime: 30000, // 最大空闲时间
|
|
connectTimeout: 3000, // 连接超时时长
|
|
};
|
|
|
|
class RpcConnection extends Base {
|
|
constructor(options) {
|
|
assert(options.logger, '[RpcConnection] options.logger is required');
|
|
assert(options.address, '[RpcConnection] options.address is required');
|
|
super(Object.assign({}, defaultOptions, options));
|
|
|
|
this._closed = false;
|
|
this._isClosing = false;
|
|
this._connected = false;
|
|
this._lastError = null;
|
|
this._sentReqs = new Map();
|
|
this._lastActiveTime = -1; // 上次和服务器通讯的时间(包括心跳)
|
|
this._lastInvokeTime = -1; // 上次调用服务器的时间
|
|
this._key = 'RpcConnection@' + this.address.host;
|
|
// 熔断器
|
|
this._circuitBreaker = CircuitBreaker.getInstance(this.key);
|
|
this._connect();
|
|
}
|
|
|
|
get key() {
|
|
return this._key;
|
|
}
|
|
|
|
get url() {
|
|
return this.address.href;
|
|
}
|
|
|
|
get address() {
|
|
return this.options.address;
|
|
}
|
|
|
|
get protocol() {
|
|
return this.options.protocol;
|
|
}
|
|
|
|
get logger() {
|
|
return this.options.logger;
|
|
}
|
|
|
|
get isOk() {
|
|
return this._connected && !this._closed;
|
|
}
|
|
|
|
get isClosed() {
|
|
return this._closed;
|
|
}
|
|
|
|
get isConnected() {
|
|
return this._connected;
|
|
}
|
|
|
|
get lastInvokeTime() {
|
|
return this._lastInvokeTime;
|
|
}
|
|
|
|
get latestHealthCount() {
|
|
return this._circuitBreaker.latestHealthCount;
|
|
}
|
|
|
|
_connect() {
|
|
this._socket = net.connect(Number(this.address.port), this.address.hostname);
|
|
this._socket.setNoDelay(this.options.noDelay);
|
|
this._socket.setTimeout(this.options.connectTimeout, () => {
|
|
const err = new Error('socket#' + this.url + ' connect timeout(' + this.options.connectTimeout + 'ms)');
|
|
err.name = 'RpcSocketConnectTimeoutError';
|
|
this.close(err);
|
|
this.ready(err);
|
|
});
|
|
this._socket.once('close', () => { this._handleClose(); });
|
|
this._socket.once('error', err => { this._handleSocketError(err); });
|
|
this._socket.once('connect', () => {
|
|
this._connected = true;
|
|
this._socket.setTimeout(0);
|
|
this._clientUrl = this._socket.localAddress + ':' + this._socket.localPort;
|
|
this.ready(true);
|
|
});
|
|
|
|
const encodeOpts = {
|
|
sentReqs: this._sentReqs,
|
|
classCache: this.options.classCache || new Map(),
|
|
address: this.address,
|
|
logger: this.options.logger,
|
|
};
|
|
const decodeOpts = {
|
|
sentReqs: this._sentReqs,
|
|
classCache: this.options.classCache || new Map(),
|
|
address: this.address,
|
|
logger: this.options.logger,
|
|
};
|
|
|
|
// 针对 classMap 针对 hessian, proto 针对 protobuf
|
|
// 因为可能需要同时支持,所以分开设置(一般通过全局设置,所以这里要判断是否有值)
|
|
if (this.options.classMap) {
|
|
encodeOpts.classMap = this.options.classMap;
|
|
decodeOpts.classMap = this.options.classMap;
|
|
}
|
|
if (this.options.proto) {
|
|
encodeOpts.proto = this.options.proto;
|
|
decodeOpts.proto = this.options.proto;
|
|
}
|
|
|
|
encodeOpts.classCache.enableCompile = true;
|
|
decodeOpts.classCache.enableCompile = true;
|
|
if (this.options.disableDecodeCache) {
|
|
decodeOpts.classCache = null;
|
|
}
|
|
|
|
this._encoder = this.protocol.encoder(encodeOpts);
|
|
this._decoder = this.protocol.decoder(decodeOpts);
|
|
this._decoder.on('response', res => { this._handleResponse(res); });
|
|
// @refer https://nodejs.org/en/docs/guides/backpressuring-in-streams/
|
|
pump(this._encoder, this._socket, this._decoder);
|
|
}
|
|
|
|
/**
|
|
* 重置计数器
|
|
*/
|
|
resetCounter() {
|
|
this._circuitBreaker.reset();
|
|
}
|
|
|
|
/**
|
|
* 心跳,用户维护长连接
|
|
*/
|
|
heartbeat() {
|
|
// 长时间空闲需要发心跳来续命(服务端 90s 空闲强制断开)
|
|
if ((Date.now() - this._lastActiveTime) > this.options.maxIdleTime) {
|
|
this._lastActiveTime = Date.now();
|
|
const id = utils.nextId();
|
|
// 只发就好,不用等 ack 了
|
|
this._encoder.writeHeartbeat(id, {
|
|
timeout: 3000,
|
|
clientUrl: this._clientUrl,
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 泛化调用 rpc 接口
|
|
*
|
|
* @param {Object} req - 请求对象
|
|
* @return {Promise} 结果
|
|
*/
|
|
async invoke(req) {
|
|
assert(req && req.timeout, '[RpcConnection] req.timeout is required');
|
|
this._lastActiveTime = this._lastInvokeTime = Date.now();
|
|
const id = utils.nextId();
|
|
req.meta.id = id;
|
|
req.meta.address = this.address;
|
|
|
|
const resPromise = this.await('response_' + id);
|
|
const timer = setTimeout(() => {
|
|
const rt = Date.now() - req.meta.start;
|
|
const err = new Error('no response in ' + rt + 'ms, address:' + this.url);
|
|
err.name = 'RpcResponseTimeoutError';
|
|
err.req = req;
|
|
err.timeout = req.timeout;
|
|
req.meta.resultCode = '03'; // 超时
|
|
this._handleRpcError(id, err);
|
|
}, req.timeout);
|
|
this._sentReqs.set(id, { req, resPromise, timer });
|
|
|
|
// 如果目前状态为熔断,则直接返回错误
|
|
if (!this._circuitBreaker.allowRequest()) {
|
|
const hc = this._circuitBreaker.latestHealthCount;
|
|
const err = new Error('this request is block by circuit breaker, ' + hc.toString() + ', url: ' + this.url);
|
|
err.name = 'RpcCircuitBreakerError';
|
|
req.meta.resultCode = '02';
|
|
// 保证异步返回
|
|
process.nextTick(() => { this._handleRpcError(id, err); });
|
|
} else {
|
|
this._encoder.writeRequest(id, req, (err, packet) => {
|
|
if (packet && packet.meta) {
|
|
req.meta.codecType = packet.meta.codecType;
|
|
req.meta.boltVersion = packet.meta.boltVersion;
|
|
req.meta.sofaVersion = packet.meta.sofaVersion;
|
|
req.meta.crcEnable = packet.meta.crcEnable;
|
|
req.meta.reqSize = packet.meta.size;
|
|
req.meta.requestEncodeStart = packet.meta.start;
|
|
req.meta.requestEncodeRT = packet.meta.encodeRT;
|
|
}
|
|
if (err) {
|
|
err.name = 'RpcRequestEncodeError';
|
|
req.meta.resultCode = '02';
|
|
// 保证异步返回
|
|
process.nextTick(() => { this._handleRpcError(id, err); });
|
|
}
|
|
});
|
|
}
|
|
const res = await resPromise;
|
|
req.meta.responseDecodeStart = res.meta.start;
|
|
req.meta.responseDecodeRT = res.meta.rt;
|
|
req.meta.resSize = res.meta.size;
|
|
req.meta.rt = Date.now() - req.meta.start;
|
|
|
|
this._circuitBreaker.update(req.meta);
|
|
return res.data;
|
|
}
|
|
|
|
_handleResponse(res) {
|
|
const id = res.packetId;
|
|
const reqInfo = this._sentReqs.get(id);
|
|
if (reqInfo) {
|
|
clearTimeout(reqInfo.timer);
|
|
this._sentReqs.delete(id);
|
|
this.emit('response_' + id, res);
|
|
} else {
|
|
this.logger.warn('[RpcConnection] can not find invoke request for response: %j, maybe it\'s timeout.', res);
|
|
}
|
|
}
|
|
|
|
_handleRpcError(id, err) {
|
|
this._handleResponse({
|
|
packetId: id,
|
|
packetType: 'response',
|
|
data: { error: err, appResponse: null },
|
|
meta: { size: 0, start: 0, rt: 0 },
|
|
});
|
|
}
|
|
|
|
_handleSocketError(err) {
|
|
if (!this.isConnected) {
|
|
this.ready(err);
|
|
return;
|
|
}
|
|
if (this._closed || this._isClosing) {
|
|
this.logger.warn('[RpcConnection] socket#%s error after closed, cause by %s', this.url, err.message);
|
|
return;
|
|
}
|
|
if (err.code === 'ECONNRESET') {
|
|
// 切流的时候可能导致大量不必要的异常日志
|
|
this.logger.warn('[RpcConnection] socket#%s ECONNRESET', this.url);
|
|
} else {
|
|
err.name = 'RpcSocketError';
|
|
err.message += ' (address: ' + this.url + ')';
|
|
this.emit('error', err);
|
|
}
|
|
}
|
|
|
|
_cleanRequest(err) {
|
|
if (!err) {
|
|
err = new Error('The socket was closed. (address: address => ' + this.url + ')');
|
|
err.name = 'RpcSocketCloseError';
|
|
err.resultCode = '02';
|
|
}
|
|
for (const id of this._sentReqs.keys()) {
|
|
this._handleResponse({
|
|
packetId: id,
|
|
packetType: 'response',
|
|
data: { error: err, appResponse: null },
|
|
meta: { size: 0, start: 0, rt: 0 },
|
|
});
|
|
}
|
|
}
|
|
|
|
_handleClose() {
|
|
this._closed = true;
|
|
this._isClosing = false;
|
|
if (this._lastError) {
|
|
this.emit('error', this._lastError);
|
|
}
|
|
this._cleanRequest(this._lastError);
|
|
this._circuitBreaker.close();
|
|
// 最后触发 close 事件
|
|
this.emit('close');
|
|
}
|
|
|
|
/**
|
|
* 强制关闭连接,不等正在进行中的请求
|
|
*
|
|
* @param {Error} [err] - 关闭的异常
|
|
*/
|
|
async forceClose(err) {
|
|
if (this._closed) {
|
|
return;
|
|
}
|
|
this._lastError = err;
|
|
this._socket.destroy();
|
|
await this.await('close');
|
|
}
|
|
|
|
/**
|
|
* 手动关闭连接
|
|
*
|
|
* @param {Error} [err] - 关闭的异常
|
|
*/
|
|
async close(err) {
|
|
this._isClosing = true;
|
|
// 等待 pending 的请求结束
|
|
await Promise.all(
|
|
Array.from(this._sentReqs.values()).map(data => data.resPromise)
|
|
);
|
|
await this.forceClose(err);
|
|
}
|
|
}
|
|
|
|
module.exports = RpcConnection;
|