Skip to content

Commit

Permalink
queue connections based on ioloop
Browse files Browse the repository at this point in the history
  • Loading branch information
jberger committed Dec 13, 2015
1 parent 8af2f90 commit f6dc532
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -74,13 +74,12 @@ sub _cleanup {

# Clean up active connections (by closing them)
delete $self->{pid};
$self->_finish($_, 1) for keys %{$self->{connections} || {}};
my $connections = $self->{connections} || {};
$self->_finish($_, 1) for keys %$connections;

# Clean up keep-alive connections
my $loop = $self->_loop(0);
$loop->remove($_->[1]) for @{delete $self->{queue} || []};
$loop = $self->_loop(1);
$loop->remove($_->[1]) for @{delete $self->{nb_queue} || []};
$connections->{$_->[1]}->ioloop->remote($_->[1]) for values %$connections;
$self->{queue} = {};

return $self;
}
Expand Down Expand Up @@ -153,7 +152,7 @@ sub _connect_proxy {
my $loop = $self->_loop($nb);
my $handle = $loop->stream($id)->steal_handle;
$self->_remove($id);
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, nb => $nb, ioloop => $loop, tx => $old);
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, ioloop => $loop, tx => $old);
$id = $self->_connect($c, 0, $handle,
sub { shift->_start($nb, $old->connection($id), $cb) });
$self->{connections}{$id} = $c;
Expand Down Expand Up @@ -182,10 +181,11 @@ sub _connected {

sub _connection {
my ($self, $nb, $tx, $cb) = @_;
my $loop = $self->_loop($nb);

# Reuse connection
my ($proto, $host, $port) = $self->transactor->endpoint($tx);
my $id = $tx->connection || $self->_dequeue($nb, "$proto:$host:$port", 1);
my $id = $tx->connection || $self->_dequeue($loop, "$proto:$host:$port", 1);
if ($id) {
warn "-- Reusing connection $id ($proto://$host:$port)\n" if DEBUG;
@{$self->{connections}{$id}}{qw(cb tx)} = ($cb, $tx);
Expand All @@ -198,7 +198,7 @@ sub _connection {
if (my $id = $self->_connect_proxy($nb, $tx, $cb)) { return $id }

# Connect
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, nb => $nb, ioloop => $self->_loop($nb), tx => $tx);
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, ioloop => $loop, tx => $tx);
$id = $self->_connect($c, 1, undef, \&_connected);
warn "-- Connect $id ($proto://$host:$port)\n" if DEBUG;
$self->{connections}{$id} = $c;
Expand All @@ -207,10 +207,9 @@ sub _connection {
}

sub _dequeue {
my ($self, $nb, $name, $test) = @_;
my ($self, $loop, $name, $test) = @_;

my $loop = $self->_loop($nb);
my $old = $self->{$nb ? 'nb_queue' : 'queue'} ||= [];
my $old = $self->{queue}{$loop} ||= [];
my ($found, @new);
for my $queued (@$old) {
push @new, $queued and next if $found || !grep { $_ eq $name } @$queued;
Expand Down Expand Up @@ -262,6 +261,8 @@ sub _finish {

sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }

sub _nb { $_[1] ? ($_[1]->ioloop == Mojo::IOLoop->singleton) : undef }

sub _read {
my ($self, $id, $chunk) = @_;

Expand All @@ -280,13 +281,13 @@ sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($c->{nb}, $new, delete $c->{cb});
return $self->_start($self->_nb($c), $new, delete $c->{cb});
}

sub _remove {
my ($self, $id) = @_;
my $c = delete $self->{connections}{$id};
$self->_dequeue($c->{nb}, $id);
$self->_dequeue($c->ioloop, $id);
$c->ioloop->remove($id);
}

Expand All @@ -301,7 +302,7 @@ sub _reuse {
if $close || !$tx || !$max || !$tx->keep_alive || $tx->error;

# Keep connection alive
my $queue = $self->{$c->{nb} ? 'nb_queue' : 'queue'} ||= [];
my $queue = $self->{queue}{$c->ioloop} ||= [];
$self->_remove(shift(@$queue)->[1]) while @$queue && @$queue >= $max;
push @$queue, [join(':', $self->transactor->endpoint($tx)), $id];
}
Expand All @@ -322,7 +323,8 @@ sub _start {
my $id = $self->emit(start => $tx)->_connection($nb, $tx, $cb);
if (my $timeout = $self->request_timeout) {
weaken $self;
$self->{connections}{$id}{timeout} = $self->_loop($nb)
my $c = $self->{connections}{$id};
$c->{timeout} = $c->ioloop
->timer($timeout => sub { $self->_error($id, 'Request timeout') });
}

Expand Down

0 comments on commit f6dc532

Please sign in to comment.