diff --git a/lib/node-http-proxy.js b/lib/node-http-proxy.js index 0c69585..27b1867 100644 --- a/lib/node-http-proxy.js +++ b/lib/node-http-proxy.js @@ -553,17 +553,29 @@ HttpProxy.prototype._forwardRequest = function (req) { }; HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options) { - var self = this, outgoing, errState = false, CRLF = '\r\n'; + var self = this, + listeners = {}, + errState = false, + CRLF = '\r\n', + outgoing; - // WebSocket requests has method = GET + // + // WebSocket requests must have the `GET` method and + // the `upgrade:websocket` header + // if (req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket') { + // // This request is not WebSocket request + // return; } - // Turn of all bufferings - // For server set KeepAlive - // For client set encoding + // + // Helper function for setting appropriate socket values: + // 1. Turn of all bufferings + // 2. For server set KeepAlive + // 3. For client set encoding + // function _socket(socket, keepAlive) { socket.setTimeout(0); socket.setNoDelay(true); @@ -580,20 +592,25 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options } } - function onUpgrade(reverseProxy, proxySocket) { + // + // On `upgrade` from the Agent socket, listen to + // the appropriate events. + // + function onUpgrade (reverseProxy, proxySocket) { if (!reverseProxy) { proxySocket.end(); socket.end(); return; } - var listeners = {}; - - // We're now connected to the server, so lets change server socket - proxySocket.on('data', listeners._r_data = function(data) { - // Pass data to client + // + // Any incoming data on this WebSocket to the proxy target + // will be written to the `reverseProxy` socket. + // + proxySocket.on('data', listeners.onIncoming = function (data) { if (reverseProxy.incoming.socket.writable) { try { + self.emit('websocket:outgoing', req, socket, head, data); reverseProxy.incoming.socket.write(data); } catch (e) { @@ -603,9 +620,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options } }); - reverseProxy.incoming.socket.on('data', listeners._data = function(data) { - // Pass data from client to server + // + // Any outgoing data on this Websocket from the proxy target + // will be written to the `proxySocket` socket. + // + reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) { try { + self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data); proxySocket.write(data); } catch (e) { @@ -613,41 +634,60 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options socket.end(); } }); - - // Detach event listeners from reverseProxy + + // + // Helper function to detach all event listeners + // from `reverseProxy` and `proxySocket`. + // function detach() { - proxySocket.removeListener('end', listeners._r_close); - proxySocket.removeListener('data', listeners._r_data); - reverseProxy.incoming.socket.removeListener('data', listeners._data); - reverseProxy.incoming.socket.removeListener('end', listeners._close); + proxySocket.removeListener('end', listeners.onIncomingClose); + proxySocket.removeListener('data', listeners.onIncoming); + reverseProxy.incoming.socket.removeListener('end', listeners.onOutgoingClose); + reverseProxy.incoming.socket.removeListener('data', listeners.onOutgoing); } - // Hook disconnections - proxySocket.on('end', listeners._r_close = function() { + // + // If the incoming `proxySocket` socket closes, then + // detach all event listeners. + // + proxySocket.on('end', listeners.onIncomingClose = function() { reverseProxy.incoming.socket.end(); detach(); + + // Emit the `end` event now that we have completed proxying + self.emit('websocket:end', req, socket, head); }); - reverseProxy.incoming.socket.on('end', listeners._close = function() { + // + // If the `reverseProxy` socket closes, then detach all + // event listeners. + // + reverseProxy.incoming.socket.on('end', listeners.onOutgoingClose = function() { proxySocket.end(); detach(); }); }; - // Client socket + // Setup the incoming client socket. _socket(socket); - // Remote host address + // + // Get the protocol, and host for this request and create an instance + // of `http.Agent` or `https.Agent` from the pool managed by `node-http-proxy`. + // var protocolName = options.https || this.target.https ? 'https' : 'http', - agent = _getAgent(options.host, options.port, options.https || this.target.https), - remoteHost = options.host + (options.port - 80 === 0 ? '' : ':' + options.port); + remoteHost = options.host + (options.port - 80 === 0 ? '' : ':' + options.port), + agent = _getAgent(options.host, options.port, options.https || this.target.https); - // Change headers + // Change headers (if requested). if (this.changeOrigin) { req.headers.host = remoteHost; req.headers.origin = protocolName + '://' + remoteHost; } + // + // Make the outgoing WebSocket request + // outgoing = { host: options.host, port: options.port, @@ -655,10 +695,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options path: req.url, headers: req.headers, }; - - // Make the outgoing WebSocket request var reverseProxy = agent.appendMessage(outgoing); + // + // On any errors from the `reverseProxy` emit the + // `webSocketProxyError` and close the appropriate + // connections. + // function proxyError (err) { reverseProxy.end(); if (self.emit('webSocketProxyError', req, socket, head)) { @@ -703,7 +746,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options // // If the reverseProxy connection has an underlying socket, - // then behing the handshake. + // then execute the WebSocket handshake. // if (typeof reverseProxy.socket !== 'undefined') { reverseProxy.socket.on('data', function handshake (data) { @@ -741,6 +784,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options // Write the printable and non-printable data to the socket // from the original incoming request. // + self.emit('websocket:handshake', req, socket, head, sdata, data); socket.write(sdata); socket.write(data); } diff --git a/test/helpers.js b/test/helpers.js index 0c326fa..956e548 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -12,6 +12,7 @@ var fs = require('fs'), vows = require('vows'), assert = require('assert'), request = require('request'), + websocket = require('./../vendor/websocket'), httpProxy = require('./../lib/node-http-proxy'); function merge (target) { @@ -120,6 +121,39 @@ TestRunner.prototype.assertResponseCode = function (proxyPort, statusCode, creat return test; }; +TestRunner.prototype.webSocketTest = function (options) { + var self = this; + + this.startTargetServer(options.ports.target, 'hello websocket', function (err, target) { + var socket = options.io.listen(target); + + if (options.onListen) { + options.onListen(socket); + } + + self.startProxyServer( + options.ports.proxy, + options.ports.target, + options.host, + function (err, proxy) { + if (options.onServer) { options.onServer(proxy) } + + // + // Setup the web socket against our proxy + // + var uri = options.wsprotocol + '://' + options.host + ':' + options.ports.proxy; + var ws = new websocket.WebSocket(uri + '/socket.io/websocket/', 'borf', { + origin: options.protocol + '://' + options.host + }); + + if (options.onWsupgrade) { ws.on('wsupgrade', options.onWsupgrade) } + if (options.onMessage) { ws.on('message', options.onMessage) } + if (options.onOpen) { ws.on('open', function () { options.onOpen(ws) }) } + } + ); + }); +} + // // Creates the reverse proxy server // diff --git a/test/node-http-proxy-test.js b/test/node-http-proxy-test.js index 4da2429..f05ae5a 100644 --- a/test/node-http-proxy-test.js +++ b/test/node-http-proxy-test.js @@ -24,11 +24,11 @@ */ -var vows = require('vows'), +var assert = require('assert'), util = require('util'), - request = require('request'), - assert = require('assert'), argv = require('optimist').argv, + request = require('request'), + vows = require('vows'), helpers = require('./helpers'); var forwardOptions = { diff --git a/test/proxy-table-test.js b/test/proxy-table-test.js index ff83066..359c82c 100644 --- a/test/proxy-table-test.js +++ b/test/proxy-table-test.js @@ -5,14 +5,14 @@ * */ -var fs = require('fs'), - vows = require('vows'), - util = require('util'), +var assert = require('assert'), + fs = require('fs'), path = require('path'), - request = require('request'), - assert = require('assert'), - helpers = require('./helpers'), + util = require('util'), argv = require('optimist').argv, + request = require('request'), + vows = require('vows'), + helpers = require('./helpers'), TestRunner = helpers.TestRunner; var protocol = argv.https ? 'https' : 'http', diff --git a/test/web-socket-proxy-test.js b/test/web-socket-proxy-test.js index d954ef5..b02075a 100644 --- a/test/web-socket-proxy-test.js +++ b/test/web-socket-proxy-test.js @@ -24,12 +24,12 @@ */ -var vows = require('vows'), - util = require('util'), - colors = require('colors'), - request = require('request'), +var util = require('util'), assert = require('assert'), argv = require('optimist').argv, + colors = require('colors'), + request = require('request'), + vows = require('vows'), websocket = require('./../vendor/websocket'), helpers = require('./helpers'); @@ -38,8 +38,8 @@ try { io = require('socket.io'); } catch (ex) { - console.error('Socket.io is required for this test:'); - console.error('npm ' + 'install'.green + ' socket.io'.magenta); + console.error('Socket.io is required for this example:'); + console.error('npm ' + 'install'.green + ' socket.io@0.6.18'.magenta); process.exit(1); } @@ -52,35 +52,32 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({ "with no latency" : { "when an inbound message is sent from a WebSocket client": { topic: function () { - var that = this; + var that = this + headers = {}; - runner.startTargetServer(8130, 'hello websocket', function (err, target) { - var socket = io.listen(target), - headers = {}; - - socket.on('connection', function (client) { - client.on('message', function (msg) { - that.callback(null, msg, headers); + runner.webSocketTest({ + io: io, + host: 'localhost', + wsprotocol: wsprotocol, + protocol: protocol, + ports: { + target: 8130, + proxy: 8131 + }, + onListen: function (socket) { + socket.on('connection', function (client) { + client.on('message', function (msg) { + that.callback(null, msg, headers); + }); }); - }); - - runner.startProxyServer(8131, 8130, 'localhost', function (err, proxy) { - // - // Setup the web socket against our proxy - // - var ws = new websocket.WebSocket(wsprotocol + '://localhost:8131/socket.io/websocket/', 'borf', { - origin: protocol + '://localhost' - }); - - ws.on('wsupgrade', function (req, res) { - headers.request = req; - headers.response = res.headers; - }); - - ws.on('open', function () { - ws.send(utils.encode('from client')); - }); - }); + }, + onWsupgrade: function (req, res) { + headers.request = req; + headers.response = res.headers; + }, + onOpen: function (ws) { + ws.send(utils.encode('from client')); + } }); }, "the target server should receive the message": function (err, msg, headers) { @@ -92,38 +89,63 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({ assert.equal(headers.request.Origin, headers.response['sec-websocket-origin']); } }, + "when an inbound message is sent from a WebSocket client with event listeners": { + topic: function () { + var that = this + headers = {}; + + runner.webSocketTest({ + io: io, + host: 'localhost', + wsprotocol: wsprotocol, + protocol: protocol, + ports: { + target: 8132, + proxy: 8133 + }, + onServer: function (server) { + server.proxy.on('websocket:incoming', function (req, socket, head, data) { + that.callback(null, data); + }); + }, + onOpen: function (ws) { + ws.send(utils.encode('from client')); + } + }); + }, + "should raise the `websocket:incoming` event": function (ign, data) { + assert.equal(utils.decode(data.toString().replace('\u0000', '')), 'from client'); + }, + }, "when an outbound message is sent from the target server": { topic: function () { - var that = this; + var that = this, + headers = {}; - runner.startTargetServer(8132, 'hello websocket', function (err, target) { - var socket = io.listen(target), - headers = {}; - - socket.on('connection', function (client) { - socket.broadcast('from server'); - }); - - runner.startProxyServer(8133, 8132, 'localhost', function (err, proxy) { - // - // Setup the web socket against our proxy - // - var ws = new websocket.WebSocket(wsprotocol + '://localhost:8133/socket.io/websocket/', 'borf', { - origin: protocol + '://localhost' + runner.webSocketTest({ + io: io, + host: 'localhost', + wsprotocol: wsprotocol, + protocol: protocol, + ports: { + target: 8134, + proxy: 8135 + }, + onListen: function (socket) { + socket.on('connection', function (client) { + socket.broadcast('from server'); }); - - ws.on('wsupgrade', function (req, res) { - headers.request = req; - headers.response = res.headers; - }); - - ws.on('message', function (msg) { - msg = utils.decode(msg); - if (!/\d+/.test(msg)) { - that.callback(null, msg, headers); - } - }); - }); + }, + onWsupgrade: function (req, res) { + headers.request = req; + headers.response = res.headers; + }, + onMessage: function (msg) { + msg = utils.decode(msg); + if (!/\d+/.test(msg)) { + that.callback(null, msg, headers); + } + } }); }, "the client should receive the message": function (err, msg, headers) {