var thinkHttp = thinkRequire('Http'); var url = require('url'); var websocket = require('websocket-driver'); var WebSocket = module.exports = Class(function(){ 'use strict'; /** * socket初始化id * @type {Number} */ var socketId = 1000; return { init: function(httpServer, app){ this.httpServer = httpServer; this.app = app; }, /** * 检测origin是否合法 * @param {[type]} origin [description] * @return {[type]} [description] */ originIsAllowed: function(origin){ var allowOrigins = C('websocket_allow_origin'); if (!allowOrigins) { return true; } var info = url.parse(origin); var hostname = info.hostname; if (isString(allowOrigins) && allowOrigins === hostname) { return true; }else if (isArray(allowOrigins) && allowOrigins.indexOf(hostname) > -1) { return true; }else if (isFunction(allowOrigins)) { return allowOrigins(hostname, info); } return false; }, /** * 选择子协议 * @param {[type]} protocolFullCaseMap [description] * @return {[type]} [description] */ getSubProtocal: function(request){ var selectedProtocal = C('websocket_sub_protocal'); if (isFunction(selectedProtocal)) { var protocals = (request.headers['sec-websocket-protocol'] || '').split(/,\s*/); selectedProtocal = selectedProtocal(protocals); } return selectedProtocal; }, /** * 建立连接处理 * @param {[type]} request [description] * @return {[type]} [description] */ openHandle: function(request, protocal){ if (request.url === '/') { return getPromise({}); } var deferred = getDefer(); var fn = function(){}; var cookies = ''; var res = {setHeader: function(name, value){ if (name === 'Set-Cookie' && value) { cookies = value; } }, end: fn, write: fn}; var self = this; thinkHttp(request, res).run().then(function(http){ http.websocket = request.socket; //子协议 http.websocket_sub_protocal = protocal; self.app.listener(http).then(function(){ http.sendCookie(); deferred.resolve({ cookie: cookies, http: http }); }).catch(function(err){ deferred.reject(err); }) }); return deferred.promise; }, /** * 消息处理 * @return {[type]} [description] */ messageHandle: function(message, connection, app){ //解析数据 try{ message = JSON.parse(message); }catch(e){ connection.socket.send(WebSocket.ERROR_MESSAGE.INVALID_JSON, message + ' is not valid json'); return; } if (message.jsonrpc !== '2.0') { connection.socket.send(WebSocket.ERROR_MESSAGE.INVALID_JSONRPC, 'data.jsonrpc must be 2.0'); return; } var method = message.method; if (!method) { connection.socket.send(WebSocket.ERROR_MESSAGE.INVALID_METHOD, 'data.method is not valid'); return; } var pars = message.params || ''; var headers = {}; if (isObject(pars.headers)) { headers = pars.headers; pars = pars.data; } var self = this; var data = { host: '', url: method, params: pars, headers: headers, write: function(data, encoding, errMsg){ var pars = self.getRPCData(JSON.parse(data), errMsg); pars.id = message.id; connection.socket.send(JSON.stringify(pars)); }, end: function(){ connection.close(); } } var defaultHttp = thinkHttp.getDefaultHttp(data); var httpInstance = thinkHttp(defaultHttp.req, defaultHttp.res); //将websocket实例添加到http对象上 httpInstance.http.websocket = connection.socket; httpInstance.run().then(app.listener); }, /** * 获取rpc数据对象 * @param {[type]} data [description] * @param {[type]} errMsg [description] * @return {[type]} [description] */ getRPCData: function(data, errMsg){ var pars = {jsonrpc: '2.0'}; if (errMsg) { pars.error = {code: data, message: errMsg}; }else{ pars.result = data; } return pars; }, /** * 建立连接 * @param {[type]} request [description] * @return {[type]} [description] */ open: function(request){ var self = this; var protocal = this.getSubProtocal(request); return this.openHandle(request, protocal).then(function(data){ var driver = websocket.http(request, { protocols: protocal }); var socket = driver.socket = request.socket; driver.send = function(data){ return isBuffer(data) ? driver.binary(data) : driver.text(data); } socket.close = function(){ driver.close(); } socket.connection = driver; if (!socket.send) { socket.send = function(data, errMsg){ var pars = self.getRPCData(data, errMsg); driver.text(JSON.stringify(pars)); } } socket.pipe(driver.io).pipe(socket); var messageHandle = C('websocket_message_handle'); driver.messages.on('data', function(message) { socket.activeTime = Date.now(); var type = isBuffer(message) ? 'buffer' : 'text'; if (isFunction(messageHandle)) { messageHandle(message, driver, self.app, type); }else{ self.messageHandle(message, driver, self.app, type); } }); driver.on('close', function(){ return data.http && data.http.emit('websocket.close'); }) //设置cookie if (!isEmpty(data.cookie)) { driver.setHeader('Set-Cookie', data.cookie); } driver.start(); return driver; }) }, /** * 执行 * @return {[type]} [description] */ run: function(){ var self = this; this.httpServer.on('upgrade', function(request, socket) { if (!websocket.isWebSocket(request)){ return; } var origin = request.headers.origin; if (!self.originIsAllowed(origin)) { return; } socket.id = socketId++; socket.activeTime = Date.now(); socket.setTimeout(C('websocket_timeout') * 1000); socket.setNoDelay(true); return self.open(request); }); } } }); /** * 错误信息 * @type {Object} */ WebSocket.ERROR_MESSAGE = { TYPE_ERROR: -100001, //数据类型错误 INVALID_JSON: -100002, //不是合法的json INVALID_JSONRPC: -100003, //不是jsonrpc数据格式 INVALID_METHOD: -100004 //请求方法不合法 }