Skip to content

Commit

Permalink
fixed keep alive connection timeout bug in Mojo::UserAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Nov 6, 2013
1 parent 8a5c06c commit abd72f8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,6 +1,7 @@

4.54 2013-11-06
- Added parts attribute to Mojo::Home.
- Fixed keep alive connection timeout bug in Mojo::UserAgent.
- Fixed support for links within a page in Mojolicious::Plugin::PODRenderer.
- Fixed home detection bug in Mojo.

Expand Down
81 changes: 41 additions & 40 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -148,37 +148,6 @@ sub websocket {
$self->start($self->build_websocket_tx(@_), $cb);
}

sub _cache {
my ($self, $name, $id) = @_;

# Enqueue and enforce connection limit
my $old = $self->{cache} ||= [];
if ($id) {
my $max = $self->max_connections;
$self->_remove(shift(@$old)->[1]) while @$old > $max;
push @$old, [$name, $id] if $max;
return;
}

# Dequeue
my $found;
my $loop = $self->_loop;
my $new = $self->{cache} = [];
for my $cached (@$old) {

# Search for id/name and remove corrupted connections
if (!$found && grep { $_ eq $name } @$cached) {
next unless my $stream = $loop->stream($cached->[1]);
$stream->is_readable ? $stream->close : ($found = $cached->[1]);
}

# Requeue
else { push @$new, $cached }
}

return $found;
}

sub _cleanup {
my $self = shift;
return unless my $loop = $self->_loop;
Expand All @@ -187,7 +156,7 @@ sub _cleanup {
$self->_handle($_, 1) for keys %{$self->{connections} || {}};

# Clean up keep-alive connections
$loop->remove($_->[1]) for @{delete $self->{cache} || []};
$loop->remove($_->[1]) for @{delete $self->{queue} || []};

return $self;
}
Expand Down Expand Up @@ -215,7 +184,8 @@ sub _connect {
return $self->_error($id, $err) if $err;

# Connection established
$stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') });
$stream->on(
timeout => sub { $self->_error($id, 'Inactivity timeout', 1) });
$stream->on(close => sub { $self->_handle($id, 1) });
$stream->on(error => sub { $self && $self->_error($id, pop) });
$stream->on(read => sub { $self->_read($id, pop) });
Expand Down Expand Up @@ -282,7 +252,7 @@ sub _connection {
# Reuse connection
my $id = $tx->connection;
my ($proto, $host, $port) = $self->transactor->endpoint($tx);
$id ||= $self->_cache("$proto:$host:$port");
$id ||= $self->_dequeue("$proto:$host:$port", 1);
if ($id && !ref $id) {
warn "-- Reusing connection ($proto:$host:$port)\n" if DEBUG;
$self->{connections}{$id} = {cb => $cb, tx => $tx};
Expand Down Expand Up @@ -313,11 +283,42 @@ sub _delegate {
return $self;
}

sub _dequeue {
my ($self, $name, $test) = @_;

my $found;
my $loop = $self->_loop;
my $old = $self->{queue} || [];
my $new = $self->{queue} = [];
for my $queued (@$old) {

# Search for id/name and sort out corrupted connections if necessary
if (!$found && grep { $_ eq $name } @$queued) {
next unless my $stream = $loop->stream($queued->[1]);
$test && $stream->is_readable ? $stream->close : ($found = $queued->[1]);
}

else { push @$new, $queued }
}

return $found;
}

sub _enqueue {
my ($self, $name, $id) = @_;

# Enforce connection limit
my $queue = $self->{queue} ||= [];
my $max = $self->max_connections;
$self->_remove(shift(@$queue)->[1]) while @$queue > $max;
push @$queue, [$name, $id] if $max;
}

sub _error {
my ($self, $id, $err) = @_;
my ($self, $id, $err, $timeout) = @_;
if (my $tx = $self->{connections}{$id}{tx}) { $tx->res->error($err) }
else { return $self->emit(error => $err) }
$self->_handle($id, !!$err);
elsif (!$timeout) { return $self->emit(error => $err) }
$self->_handle($id, 1);
}

sub _handle {
Expand Down Expand Up @@ -378,12 +379,12 @@ sub _remove {
# Close connection
my $tx = (delete($self->{connections}{$id}) || {})->{tx};
if ($close || !$tx || !$tx->keep_alive || $tx->error) {
$self->_cache($id);
$self->_dequeue($id);
return $self->_loop->remove($id);
}

# Keep connection alive (CONNECT requests get upgraded)
$self->_cache(join(':', $self->transactor->endpoint($tx)), $id)
$self->_enqueue(join(':', $self->transactor->endpoint($tx)), $id)
unless uc $tx->req->method eq 'CONNECT';
}

Expand Down Expand Up @@ -670,7 +671,7 @@ Local address to bind to.
$ua = $ua->max_connections(5);
Maximum number of keep-alive connections that the user agent will retain
before it starts closing the oldest cached ones, defaults to C<5>.
before it starts closing the oldest ones, defaults to C<5>.
=head2 max_redirects
Expand Down
13 changes: 13 additions & 0 deletions t/mojo/user_agent.t
Expand Up @@ -275,6 +275,19 @@ $tx = $ua->get('/timeout?timeout=5');
ok !$tx->success, 'not successful';
is $tx->error, 'Inactivity timeout', 'right error';

# Keep alive connection times out
my $id;
$ua->get(
'/' => sub {
my ($ua, $tx) = @_;
Mojo::IOLoop->timer(0.5 => sub { Mojo::IOLoop->stop });
$id = $tx->connection;
Mojo::IOLoop->stream($id)->timeout(0.25);
}
);
Mojo::IOLoop->start;
ok !Mojo::IOLoop->stream($id), 'connection timed out';

# Response exceeding message size limit
$ua->once(
start => sub {
Expand Down

0 comments on commit abd72f8

Please sign in to comment.