mirror of
https://github.com/sofastack/sofa-rpc-node.git
synced 2025-12-08 20:26:00 +00:00
134 lines
3.6 KiB
JavaScript
134 lines
3.6 KiB
JavaScript
'use strict';
|
|
|
|
const http2 = require('http2');
|
|
const assert = require('assert');
|
|
const qs = require('querystring');
|
|
const RpcServer = require('../server');
|
|
const GRpcResponse = require('./response');
|
|
const { GRpcClient } = require('../../client');
|
|
const ProtoUtil = require('../../util/proto_util');
|
|
|
|
const {
|
|
HTTP2_HEADER_PATH,
|
|
} = http2.constants;
|
|
|
|
const units = {
|
|
m: 1,
|
|
S: 1000,
|
|
M: 60 * 1000,
|
|
H: 60 * 60 * 1000,
|
|
};
|
|
|
|
const _testClient = Symbol.for('RpcServer#testClient');
|
|
const defaultOptions = {
|
|
responseClass: GRpcResponse,
|
|
};
|
|
|
|
class GRpcServer extends RpcServer {
|
|
constructor(options = {}) {
|
|
assert(options.proto, '[GRpcServer] options.proto is required');
|
|
super(Object.assign({}, defaultOptions, options));
|
|
}
|
|
|
|
get testClient() {
|
|
if (!this[_testClient]) {
|
|
this[_testClient] = new GRpcClient({
|
|
logger: this.logger,
|
|
proto: this.proto,
|
|
});
|
|
}
|
|
return this[_testClient];
|
|
}
|
|
|
|
get url() {
|
|
// uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400
|
|
return 'http://' + this.publishAddress + ':' + this.publishPort + '?' + qs.stringify(this.params);
|
|
}
|
|
|
|
_startServer(port) {
|
|
const server = http2.createServer();
|
|
server.once('error', err => { this.emit('error', err); });
|
|
server.on('session', session => { this._handleSocket(session); });
|
|
server.on('stream', (stream, headers) => { this._handleStream(stream, headers); });
|
|
server.listen(port, () => {
|
|
const realPort = server.address().port;
|
|
if (port === this.publishPort && port === 0) {
|
|
this.publishPort = realPort;
|
|
}
|
|
this.logger.info('[RpcServer] server start on %s', realPort);
|
|
});
|
|
return server;
|
|
}
|
|
|
|
_handleSocket(session) {
|
|
const socket = session.socket;
|
|
const key = socket.remoteAddress + ':' + socket.remotePort;
|
|
this._connections.set(key, session);
|
|
session.once('close', () => { this._connections.delete(key); });
|
|
this.emit('connection', session);
|
|
}
|
|
|
|
_handleStream(stream, headers) {
|
|
const path = headers[HTTP2_HEADER_PATH];
|
|
const version = headers['grpc-version'] || '1.0';
|
|
const arr = path.split('/');
|
|
const interfaceName = arr[1];
|
|
const methodName = arr[2];
|
|
let timeout = headers['grpc-timeout'];
|
|
if (timeout && timeout.length > 1) {
|
|
timeout = Number(timeout.slice(0, -1)) * units[timeout.slice(-1)];
|
|
}
|
|
const serverSignature = interfaceName + ':' + version;
|
|
const methodInfo = ProtoUtil.getMethodInfo(this.proto, interfaceName, methodName);
|
|
const req = {
|
|
data: {
|
|
serverSignature,
|
|
interfaceName,
|
|
methodName,
|
|
args: [],
|
|
},
|
|
options: {
|
|
codecType: 'protobuf',
|
|
timeout,
|
|
},
|
|
meta: {
|
|
size: 0,
|
|
},
|
|
};
|
|
|
|
let buf = null;
|
|
stream.on('data', data => {
|
|
if (buf) {
|
|
buf = Buffer.concat([ buf, data ]);
|
|
} else {
|
|
buf = data;
|
|
}
|
|
|
|
const total = buf.length;
|
|
if (total < 5) return;
|
|
|
|
const bodySize = buf.readUInt32BE(1);
|
|
if (total < bodySize + 5) return;
|
|
|
|
const msg = buf.slice(5, bodySize + 5);
|
|
if (methodInfo.requestType) {
|
|
const requestType = methodInfo.resolvedRequestType;
|
|
const arg = requestType.decode(msg);
|
|
req.data.args.push(arg);
|
|
}
|
|
req.meta.size = total;
|
|
});
|
|
stream.on('end', () => {
|
|
this._handleRequest(req, {
|
|
stream,
|
|
methodInfo,
|
|
}).catch(err => { this.emit('error', err); });
|
|
});
|
|
stream.on('error', err => {
|
|
this.logger.warn(err);
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = GRpcServer;
|