diff --git a/lib/node-http-proxy/http-proxy.js b/lib/node-http-proxy/http-proxy.js index 4ccbcab..00fc05e 100644 --- a/lib/node-http-proxy/http-proxy.js +++ b/lib/node-http-proxy/http-proxy.js @@ -224,12 +224,22 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) { response.on('data', function (chunk) { if (req.method !== 'HEAD' && res.writable) { try { - res.write(chunk); + var flushed = res.write(chunk); } catch (ex) { console.error("res.write error: %s", ex.message); + try { res.end() } - catch (ex) { console.error("res.write error: %s", ex.message) } + catch (ex) { console.error("res.end error: %s", ex.message) } + + return; + } + + if (!flushed) { + response.pause(); + res.once('drain', function () { + response.resume(); + }); } } }); @@ -265,7 +275,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) { // req.on('data', function (chunk) { if (!errState) { - reverseProxy.write(chunk); + var flushed = reverseProxy.write(chunk); + if (!flushed) { + req.pause(); + reverseProxy.once('drain', function () { + req.resume(); + }); + } } }); @@ -334,8 +350,14 @@ HttpProxy.prototype._forwardRequest = function (req) { // the proxied request come in // req.on('data', function (chunk) { - forwardProxy.write(chunk); - }) + var flushed = forwardProxy.write(chunk); + if (!flushed) { + req.pause(); + forwardProxy.once('drain', function () { + req.resume(); + }); + } + }); // // At the end of the client request, we are going to @@ -357,7 +379,7 @@ HttpProxy.prototype._forwardRequest = function (req) { // HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) { var self = this, - outgoing = new(this.target.base); + outgoing = new(this.target.base), listeners = {}, errState = false, CRLF = '\r\n'; @@ -420,7 +442,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) if (reverseProxy.incoming.socket.writable) { try { self.emit('websocket:outgoing', req, socket, head, data); - reverseProxy.incoming.socket.write(data); + var flushed = reverseProxy.incoming.socket.write(data); + if (!flushed) { + proxySocket.pause(); + reverseProxy.incoming.socket.once('drain', function () { + proxySocket.resume(); + }); + } } catch (ex) { detach(); @@ -437,7 +465,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function (data) { try { self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data); - proxySocket.write(data); + var flushed = proxySocket.write(data); + if (!flushed) { + reverseProxy.incoming.socket.pause(); + proxySocket.once('drain', function () { + reverseProxy.incoming.socket.resume(); + }); + } } catch (ex) { detach(); @@ -595,7 +629,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) // self.emit('websocket:handshake', req, socket, head, sdata, data); socket.write(sdata); - socket.write(data); + var flushed = socket.write(data); + if (!flushed) { + reverseProxy.socket.pause(); + socket.once('drain', function () { + reverseProxy.socket.resume(); + }); + } } catch (ex) { // @@ -620,7 +660,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) try { // - // Attempt to write the upgrade-head to the reverseProxy request. + // Attempt to write the upgrade-head to the reverseProxy + // request. This is small, and there's only ever one of + // it; no need for pause/resume. // reverseProxy.write(head); }