Skip to content

Commit

Permalink
backport some of jberger's refactoring work
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Dec 14, 2015
1 parent 1151591 commit 55d299b
Showing 1 changed file with 34 additions and 39 deletions.
73 changes: 34 additions & 39 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -51,12 +51,12 @@ sub start {
# Non-blocking
if ($cb) {
warn "-- Non-blocking request (@{[_url($tx)]})\n" if DEBUG;
return $self->_start(1, $tx, $cb);
return $self->_start(Mojo::IOLoop->singleton, $tx, $cb);
}

# Blocking
warn "-- Blocking request (@{[_url($tx)]})\n" if DEBUG;
$self->_start(0, $tx => sub { shift->ioloop->stop; $tx = shift });
$self->_start($self->ioloop, $tx => sub { shift->ioloop->stop; $tx = shift });
$self->ioloop->start;

return $tx;
Expand All @@ -76,7 +76,7 @@ sub _cleanup {
}

sub _connect {
my ($self, $nb, $peer, $tx, $handle, $cb) = @_;
my ($self, $loop, $peer, $tx, $handle, $cb) = @_;

my $t = $self->transactor;
my ($proto, $host, $port) = $peer ? $t->peer($tx) : $t->endpoint($tx);
Expand All @@ -99,7 +99,7 @@ sub _connect {

weaken $self;
my $id;
return $id = $self->_loop($nb)->client(
return $id = $loop->client(
%options => sub {
my ($loop, $err, $stream) = @_;

Expand All @@ -118,12 +118,12 @@ sub _connect {
}

sub _connect_proxy {
my ($self, $nb, $old, $cb) = @_;
my ($self, $loop, $old, $cb) = @_;

# Start CONNECT request
return undef unless my $new = $self->transactor->proxy_connect($old);
return $self->_start(
($nb, $new) => sub {
($loop, $new) => sub {
my ($self, $tx) = @_;

# CONNECT failed (connection needs to be kept alive)
Expand All @@ -134,15 +134,15 @@ sub _connect_proxy {
# Start real transaction
$old->req->via_proxy(0);
my $id = $tx->connection;
return $self->_start($nb, $old->connection($id), $cb)
return $self->_start($loop, $old->connection($id), $cb)
unless $tx->req->url->protocol eq 'https';

# TLS upgrade
my $handle = $self->_loop($nb)->stream($id)->steal_handle;
my $handle = $loop->stream($id)->steal_handle;
$self->_remove($id);
$id = $self->_connect($nb, 0, $old, $handle,
sub { shift->_start($nb, $old->connection($id), $cb) });
$self->{connections}{$id} = {cb => $cb, nb => $nb, tx => $old};
$id = $self->_connect($loop, 0, $old, $handle,
sub { shift->_start($loop, $old->connection($id), $cb) });
$self->{connections}{$id} = {cb => $cb, ioloop => $loop, tx => $old};
}
);
}
Expand All @@ -151,9 +151,8 @@ sub _connected {
my ($self, $id) = @_;

# Inactivity timeout
my $c = $self->{connections}{$id};
my $stream
= $self->_loop($c->{nb})->stream($id)->timeout($self->inactivity_timeout);
my $c = $self->{connections}{$id};
my $stream = $c->{ioloop}->stream($id)->timeout($self->inactivity_timeout);

# Store connection information in transaction
my $tx = $c->{tx}->connection($id);
Expand All @@ -168,11 +167,11 @@ sub _connected {
}

sub _connection {
my ($self, $nb, $tx, $cb) = @_;
my ($self, $loop, $tx, $cb) = @_;

# Reuse connection
my ($proto, $host, $port) = $self->transactor->endpoint($tx);
my $id = $tx->connection || $self->_dequeue($nb, "$proto:$host:$port", 1);
my $id = $tx->connection || $self->_dequeue($loop, "$proto:$host:$port", 1);
if ($id) {
warn "-- Reusing connection $id ($proto://$host:$port)\n" if DEBUG;
@{$self->{connections}{$id}}{qw(cb tx)} = ($cb, $tx);
Expand All @@ -182,21 +181,20 @@ sub _connection {
}

# CONNECT request to proxy required
if (my $id = $self->_connect_proxy($nb, $tx, $cb)) { return $id }
if (my $id = $self->_connect_proxy($loop, $tx, $cb)) { return $id }

# Connect
$id = $self->_connect($nb, 1, $tx, undef, \&_connected);
$id = $self->_connect($loop, 1, $tx, undef, \&_connected);
warn "-- Connect $id ($proto://$host:$port)\n" if DEBUG;
$self->{connections}{$id} = {cb => $cb, nb => $nb, tx => $tx};
$self->{connections}{$id} = {cb => $cb, ioloop => $loop, tx => $tx};

return $id;
}

sub _dequeue {
my ($self, $nb, $name, $test) = @_;
my ($self, $loop, $name, $test) = @_;

my $loop = $self->_loop($nb);
my $old = $self->{$nb ? 'nb_queue' : 'queue'} ||= [];
my $old = $self->{queue}{$loop} ||= [];
my ($found, @new);
for my $queued (@$old) {
push @new, $queued and next if $found || !grep { $_ eq $name } @$queued;
Expand All @@ -222,8 +220,7 @@ sub _finish {

# Remove request timeout
return unless my $c = $self->{connections}{$id};
my $loop = $self->_loop($c->{nb});
$loop->remove($c->{timeout}) if $c->{timeout};
$c->{ioloop}->remove($c->{timeout}) if $c->{timeout};

return $self->_reuse($id, $close) unless my $old = $c->{tx};
$old->client_close($close);
Expand All @@ -246,14 +243,11 @@ sub _finish {
$c->{cb}($self, $old) unless $self->_redirect($c, $old);
}

sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }

sub _read {
my ($self, $id, $chunk) = @_;

# Corrupted connection
return unless my $c = $self->{connections}{$id};
return $self->_remove($id) unless my $tx = $c->{tx};
return $self->_remove($id) unless my $tx = $self->{connections}{$id}{tx};

# Process incoming data
warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
Expand All @@ -266,14 +260,14 @@ sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($c->{nb}, $new, delete $c->{cb});
return $self->_start($c->{ioloop}, $new, delete $c->{cb});
}

sub _remove {
my ($self, $id) = @_;
my $c = delete $self->{connections}{$id};
$self->_dequeue($c->{nb}, $id);
$self->_loop($c->{nb})->remove($id);
$self->_dequeue($c->{ioloop}, $id);
$c->{ioloop}->remove($id);
}

sub _reuse {
Expand All @@ -287,29 +281,30 @@ sub _reuse {
if $close || !$tx || !$max || !$tx->keep_alive || $tx->error;

# Keep connection alive
my $queue = $self->{$c->{nb} ? 'nb_queue' : 'queue'} ||= [];
my $queue = $self->{queue}{$c->{ioloop}} ||= [];
$self->_remove(shift(@$queue)->[1]) while @$queue && @$queue >= $max;
push @$queue, [join(':', $self->transactor->endpoint($tx)), $id];
}

sub _start {
my ($self, $nb, $tx, $cb) = @_;
my ($self, $loop, $tx, $cb) = @_;

# Application server
my $url = $tx->req->url;
unless ($url->is_abs) {
my $base = $nb ? $self->server->nb_url : $self->server->url;
my $base
= $loop == $self->ioloop ? $self->server->url : $self->server->nb_url;
$url->scheme($base->scheme)->authority($base->authority);
}

$_->prepare($tx) for $self->proxy, $self->cookie_jar;

# Connect and add request timeout if necessary
my $id = $self->emit(start => $tx)->_connection($nb, $tx, $cb);
my $id = $self->emit(start => $tx)->_connection($loop, $tx, $cb);
if (my $timeout = $self->request_timeout) {
weaken $self;
$self->{connections}{$id}{timeout} = $self->_loop($nb)
->timer($timeout => sub { $self->_error($id, 'Request timeout') });
$self->{connections}{$id}{timeout}
= $loop->timer($timeout => sub { $self->_error($id, 'Request timeout') });
}

return $id;
Expand All @@ -321,13 +316,13 @@ sub _write {
my ($self, $id) = @_;

# Get and write chunk
return unless my $c = $self->{connections}{$id};
my $c = $self->{connections}{$id};
return unless my $tx = $c->{tx};
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->client_write;
delete $c->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $self->_loop($c->{nb})->stream($id)->write($chunk);
my $stream = $c->{ioloop}->stream($id)->write($chunk);
$self->_finish($id) if $tx->is_finished;

# Continue writing
Expand Down

0 comments on commit 55d299b

Please sign in to comment.