diff --git a/lib/caronte/index.js b/lib/caronte/index.js index 8339fae..586ed3f 100644 --- a/lib/caronte/index.js +++ b/lib/caronte/index.js @@ -56,7 +56,7 @@ function createRightProxy(type) { var evnt = ev + pass.name.toLowerCase(); options.ee.emit(evnt + 'begin', req, res); - var val = pass(req, res, options); + var val = pass(req, res, options, head); options.ee.emit(evnt + 'end'); return val; diff --git a/lib/caronte/passes/ws.js b/lib/caronte/passes/ws.js index 827617b..8a35155 100644 --- a/lib/caronte/passes/ws.js +++ b/lib/caronte/passes/ws.js @@ -28,10 +28,22 @@ function checkMethodAndHeader (req, res, options) { } if (req.headers.upgrade.toLowerCase() !== 'websocket') { - req.end(); return true; + res.destroy(); return true; } }, +/** + * Setup socket + * + */ + +function setupSocket(req, res) { + res.setTimeout(0); + res.setNoDelay(true); + + res.setKeepAlive(true, 0); +}, + /** * Sets `x-forwarded-*` headers if specified in config. * @@ -58,8 +70,8 @@ function XHeaders(req, res, options) { * * */ -function stream(req, res, options, instance) { - req.pipe(new WebsocketStream(options, instance)).pipe(res); +function stream(req, res, options, head) { + req.pipe(new WebsocketStream(options, head)).pipe(res); } ] // <-- diff --git a/lib/caronte/streams/websocket.js b/lib/caronte/streams/websocket.js index dc43daf..e7c71b3 100644 --- a/lib/caronte/streams/websocket.js +++ b/lib/caronte/streams/websocket.js @@ -8,8 +8,9 @@ module.exports = WebsocketStream; function WebsocketStream(options, res) { Duplex.call(this); - this.options = options; - this.res = res; + this.options = options; + this.res = res; + this.handshakeDone = false; var self = this; @@ -28,9 +29,13 @@ WebsocketStream.prototype.onPipe = function(req) { common.setupOutgoing(self.options.ssl || {}, self.options, req) ); - this.proxyReq.once('response', function(proxyRes) { - self.onResponse(proxyRes); + this.proxyReq.once('socket', function(proxySocket) { + self.onSocket(proxySocket); }); + this.proxyReq.on('upgrade', function(proxyRes, proxySocket, proxyHead) { + self.onUpgrade(proxyRes, proxySocket, proxyHead); + }); + this.proxyReq.on('error', function(e) { self.onError(e); }); @@ -40,8 +45,25 @@ WebsocketStream.prototype.onFinish = function() { this.proxyReq.end(); }; -WebsocketStream.prototype.onResponse = function(proxyRes) { - this.proxyRes = proxyRes; +WebsocketStream.prototype.onSocket = function(proxySocket) { + + +}; + +WebsocketStream.prototype.onUpgrade = function(proxyRes, proxySocket, proxyHead) { + this.handshake = { + headers : proxyRes.headers, + statusCode : proxyRes.statusCode + }; + + this.proxyRes = proxyRes; + this.proxySocket = proxySocket; + this.proxyHead = proxyHead; + + proxySocket.setTimeout(0); + proxySocket.setNoDelay(true); + + proxySocket.setKeepAlive(true, 0); }; @@ -52,9 +74,42 @@ WebsocketStream.prototype.onError = function(e) { WebsocketStream.prototype._write = function(chunk, encoding, callback) { - + this.proxySocket.write(chunk, encoding, callback); }; WebsocketStream.prototype._read = function(size) { + var chunk = (this.proxySocket ? this.proxySocket.read(size) : '') || ''; + + if(chunk && !this.handshakeDone) { + var headers = ''; -}; \ No newline at end of file + if (this.handshake.statusCode && this.handshake.statusCode == 101) { + headers = [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Accept: ' + this.handshake.headers['sec-websocket-accept'] + ]; + + headers = headers.concat('', '').join('\r\n'); + } + + /* + * Socket.IO specific code + */ + + var sdata = chunk.toString(); + sdata = sdata.substr(0, sdata.search(CRLF + CRLF)); + chunk = data.slice(Buffer.byteLength(sdata), data.length); + + if (self.source.https && !self.target.https) { sdata = sdata.replace('ws:', 'wss:'); } + + this.push(headers + sdata); + this.push(data); + + this.handshakeDone = true; + return; + } + + this.push(chunk); +};