Skip to content

Commit

Permalink
[api] proxystream
Browse files Browse the repository at this point in the history
  • Loading branch information
yawnt committed Jun 24, 2013
1 parent 44097a8 commit 09fb14e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 245 deletions.
240 changes: 2 additions & 238 deletions lib/node-http-proxy/http-proxy.js
Expand Up @@ -29,6 +29,7 @@ var events = require('events'),
util = require('util'),
url = require('url'),
ForwardStream = require('./streams/forward'),
ProxyStream = require('./streams/proxy'),
httpProxy = require('../node-http-proxy');

//
Expand Down Expand Up @@ -163,6 +164,7 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
// Emit the `start` event indicating that we have begun the proxy operation.
//
this.emit('start', req, res, this.target);
req.pipe(new ProxyStream(res, options)).pipe(res);

This comment has been minimized.

Copy link
@cronopio

cronopio Jul 11, 2013

Contributor

Here the options var is undefined, I just change it to this because there is the field called source and target required by ProxyStream() and now Im seeing a problem in the options passed to request. More info here 44097a8#commitcomment-3615874


//
// If forwarding is enabled for this instance, foward proxy the
Expand All @@ -172,244 +174,6 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
this.emit('forward', req, res, this.forward);
req.pipe(new ForwardStream(this.forward));
}

//
// #### function proxyError (err)
// #### @err {Error} Error contacting the proxy target
// Short-circuits `res` in the event of any error when
// contacting the proxy target at `host` / `port`.
//
function proxyError(err) {
errState = true;

//
// Emit an `error` event, allowing the application to use custom
// error handling. The error handler should end the response.
//
if (self.emit('proxyError', err, req, res)) {
return;
}

res.writeHead(500, { 'Content-Type': 'text/plain' });

if (req.method !== 'HEAD') {
//
// This NODE_ENV=production behavior is mimics Express and
// Connect.
//
res.write(process.env.NODE_ENV === 'production'
? 'Internal Server Error'
: 'An error has occurred: ' + JSON.stringify(err)
);
}

try { res.end() }
catch (ex) { console.error("res.end error: %s", ex.message) }
}

//
// Setup outgoing proxy with relevant properties.
//
httpProxy._setupOutgoing(outgoing, this.target, req);

//
// If the changeOrigin option is specified, change the
// origin of the host header to the target URL! Please
// don't revert this without documenting it!
//
if (this.changeOrigin) {
outgoing.headers.host = this.target.host + ':' + this.target.port;
}

//
// Open new HTTP request to internal resource with will act
// as a reverse proxy pass
//
reverseProxy = this.target.protocol.request(outgoing, function (response) {
//
// Process the `reverseProxy` `response` when it's received.
//
if (req.httpVersion === '1.0') {
if (req.headers.connection) {
response.headers.connection = req.headers.connection;
} else {
response.headers.connection = 'close';
}
} else if (!response.headers.connection) {
if (req.headers.connection) { response.headers.connection = req.headers.connection }
else {
response.headers.connection = 'keep-alive';
}
}

// Remove `Transfer-Encoding` header if client's protocol is HTTP/1.0
// or if this is a DELETE request with no content-length header.
// See: https://github.com/nodejitsu/node-http-proxy/pull/373
if (req.httpVersion === '1.0' || (req.method === 'DELETE'
&& !req.headers['content-length'])) {
delete response.headers['transfer-encoding'];
}

if ((response.statusCode === 301 || response.statusCode === 302)
&& typeof response.headers.location !== 'undefined') {
location = url.parse(response.headers.location);
if (location.host === req.headers.host) {
if (self.source.https && !self.target.https) {
response.headers.location = response.headers.location.replace(/^http\:/, 'https:');
}
if (self.target.https && !self.source.https) {
response.headers.location = response.headers.location.replace(/^https\:/, 'http:');
}
}
}

//
// When the `reverseProxy` `response` ends, end the
// corresponding outgoing `res` unless we have entered
// an error state. In which case, assume `res.end()` has
// already been called and the 'error' event listener
// removed.
//
var ended = false;
response.on('close', function () {
if (!ended) { response.emit('end') }
});

//
// After reading a chunked response, the underlying socket
// will hit EOF and emit a 'end' event, which will abort
// the request. If the socket was paused at that time,
// pending data gets discarded, truncating the response.
// This code makes sure that we flush pending data.
//
response.connection.on('end', function () {
if (response.readable && response.resume) {
response.resume();
}
});

response.on('end', function () {
ended = true;
if (!errState) {
try { res.end() }
catch (ex) { console.error("res.end error: %s", ex.message) }

// Emit the `end` event now that we have completed proxying
self.emit('end', req, res, response);
}
});

// Allow observer to modify headers or abort response
try { self.emit('proxyResponse', req, res, response) }
catch (ex) {
errState = true;
return;
}

// Set the headers of the client response
Object.keys(response.headers).forEach(function (key) {
res.setHeader(key, response.headers[key]);
});
res.writeHead(response.statusCode);

function ondata(chunk) {
if (res.writable) {
// Only pause if the underlying buffers are full,
// *and* the connection is not in 'closing' state.
// Otherwise, the pause will cause pending data to
// be discarded and silently lost.
if (false === res.write(chunk) && response.pause
&& response.connection.readable) {
response.pause();
}
}
}

response.on('data', ondata);

function ondrain() {
if (response.readable && response.resume) {
response.resume();
}
}

res.on('drain', ondrain);
});

//
// Handle 'error' events from the `reverseProxy`. Setup timeout override if needed
//
reverseProxy.once('error', proxyError);

// Set a timeout on the socket if `this.timeout` is specified.
reverseProxy.once('socket', function (socket) {
if (self.timeout) {
socket.setTimeout(self.timeout);
}
});

//
// Handle 'error' events from the `req` (e.g. `Parse Error`).
//
req.on('error', proxyError);

//
// If `req` is aborted, we abort our `reverseProxy` request as well.
//
req.on('aborted', function () {
reverseProxy.abort();
});

//
// For each data `chunk` received from the incoming
// `req` write it to the `reverseProxy` request.
//
req.on('data', function (chunk) {
if (!errState) {
var flushed = reverseProxy.write(chunk);
if (!flushed) {
req.pause();
reverseProxy.once('drain', function () {
try { req.resume() }
catch (er) { console.error("req.resume error: %s", er.message) }
});

//
// Force the `drain` event in 100ms if it hasn't
// happened on its own.
//
setTimeout(function () {
reverseProxy.emit('drain');
}, 100);
}
}
});

//
// When the incoming `req` ends, end the corresponding `reverseProxy`
// request unless we have entered an error state.
//
req.on('end', function () {
if (!errState) {
reverseProxy.end();
}
});

//Aborts reverseProxy if client aborts the connection.
req.on('close', function () {
if (!errState) {
reverseProxy.abort();
}
});

//
// If we have been passed buffered data, resume it.
//
if (buffer) {
return !errState
? buffer.resume()
: buffer.destroy();
}
};

//
Expand Down
90 changes: 83 additions & 7 deletions lib/node-http-proxy/streams/proxy.js
Expand Up @@ -5,18 +5,20 @@ var Duplex = require('stream').Duplex,
url = require('url'),
util = require('util');

var ProxyStream = module.exports = function ProxyStream(target, changeOrigin) {
var ProxyStream = module.exports = function ProxyStream(response, options) {
Duplex.call(this);

var self = this;

var self = this,
target = options.target,
source = options.source;

this.once('pipe', function(req) {
var protocol = target.https ? https : http,
outgoing = proxy._getBase(target);

proxy._setupOutgoing(outgoing, target, req);

if (changeOrigin) {
if (options.changeOrigin) {
outgoing.headers.host = target.host + ':' + target.port;
}

Expand All @@ -29,14 +31,81 @@ var ProxyStream = module.exports = function ProxyStream(target, changeOrigin) {
}

if(req.httpVersion === '1.0' || (req.method === 'DELETE' && !req.headers['content-length'])) {
delete response.headers['transfer-encoding'];
delete res.headers['transfer-encoding'];
}

if(~[301,302].indexOf(res.statusCode) && typeof response.headers.location !== 'undefined') {
location = url.parse(response.headers.location);
if(~[301,302].indexOf(res.statusCode) && typeof res.headers.location !== 'undefined') {
var location = url.parse(res.headers.location);
if (
location.host === req.headers.host &&
(
source.https && !target.https ||
target.https && !source.https
)
) {
res.headers.location = res.headers.location.replace(/^https\:/, 'http:');
}
}

try {

This comment has been minimized.

Copy link
@mmalecki

mmalecki Jul 29, 2013

Contributor

This try .. catch is very confusing to people who implement this event and might cause deoptimization. Let's get rid of it and just emit instead.

self.emit('proxyResponse', req, response, res);
} catch (e) {}

Object.keys(res.headers).forEach(function (key) {
response.setHeader(key, res.headers[key]);
});
response.writeHead(response.statusCode);
});

/*
//
// Handle 'error' events from the `reverseProxy`. Setup timeout override if needed
//
self.request.once('error', proxyError);
// Set a timeout on the socket if `this.timeout` is specified.
reverseProxy.once('socket', function (socket) {
if (self.timeout) {
socket.setTimeout(self.timeout);
}
}); */

/*
//
// #### function proxyError (err)
// #### @err {Error} Error contacting the proxy target
// Short-circuits `res` in the event of any error when
// contacting the proxy target at `host` / `port`.
//
function proxyError(err) {
errState = true;
//
// Emit an `error` event, allowing the application to use custom
// error handling. The error handler should end the response.
//
if (self.emit('proxyError', err, req, res)) {
return;
}
res.writeHead(500, { 'Content-Type': 'text/plain' });
if (req.method !== 'HEAD') {
//
// This NODE_ENV=production behavior is mimics Express and
// Connect.
//
res.write(process.env.NODE_ENV === 'production'
? 'Internal Server Error'
: 'An error has occurred: ' + JSON.stringify(err)
);
}
try { res.end() }
catch (ex) { console.error("res.end error: %s", ex.message) }
}
*/
});

};
Expand All @@ -45,4 +114,11 @@ ForwardStream.prototype._write = function(chunk, encoding, callback) {
this.request.write(chunk, encoding, callback);
};

ForwardStream.prototype._read = function(size) {
var chunk = self.request.read();
if(chunk !== null) {
this.push(chunk);
}
};

util.inherits(ForwardStream, Duplex);

0 comments on commit 09fb14e

Please sign in to comment.