Skip to content

Commit

Permalink
simplify connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Jul 11, 2014
1 parent 639e5c8 commit f7ea445
Showing 1 changed file with 25 additions and 31 deletions.
56 changes: 25 additions & 31 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -76,7 +76,7 @@ sub _cleanup {

# Clean up active connections (by closing them)
delete $self->{pid};
$self->_handle($_, 1) for keys %{$self->{connections} || {}};
$self->_finish($_, 1) for keys %{$self->{connections} || {}};

# Clean up keep-alive connections
$loop->remove($_->[1]) for @{delete $self->{queue} || []};
Expand Down Expand Up @@ -111,7 +111,7 @@ sub _connect {
# Connection established
$stream->on(
timeout => sub { $self->_error($id, 'Inactivity timeout', 1) });
$stream->on(close => sub { $self->_handle($id, 1) });
$stream->on(close => sub { $self->_finish($id, 1) });
$stream->on(error => sub { $self && $self->_error($id, pop) });
$stream->on(read => sub { $self->_read($id, pop) });
$self->$cb;
Expand Down Expand Up @@ -235,43 +235,37 @@ sub _error {
$tx->res->error({message => $err});
}
elsif (!$timeout) { return $self->emit(error => $err) }
$self->_handle($id, 1);
$self->_finish($id, 1);
}

sub _handle {
sub _finish {
my ($self, $id, $close) = @_;

# Remove request timeout
my $c = $self->{connections}{$id};
return unless my $loop = $self->_loop($c->{nb});
$loop->remove($c->{timeout}) if $c->{timeout};

return $self->_remove($id, $close) unless my $old = $c->{tx};
$old->client_close($close);

# Finish WebSocket
my $old = $c->{tx};
if ($old && $old->is_websocket) {
if ($old->is_websocket) {
delete $self->{connections}{$id};
$self->_remove($id, $close);
$old->client_close;
return $self->_remove($id, $close);
}

if (my $jar = $self->cookie_jar) { $jar->extract($old) }

# Upgrade connection to WebSocket
elsif ($old && (my $new = $self->_upgrade($id))) {
if (my $jar = $self->cookie_jar) { $jar->extract($old) }
$old->client_close;
if (my $new = $self->_upgrade($id)) {
$c->{cb}->($self, $new);
$new->client_read($old->res->content->leftovers);
return $new->client_read($old->res->content->leftovers);
}

# Finish normal connection
else {
$self->_remove($id, $close);
return unless $old;
if (my $jar = $self->cookie_jar) { $jar->extract($old) }
$old->client_close($close);

# Handle redirects
$c->{cb}->($self, $new || $old) unless $self->_redirect($c, $old);
}
# Finish normal connection and handle redirects
$self->_remove($id, $close);
$c->{cb}->($self, $old) unless $self->_redirect($c, $old);
}

sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }
Expand All @@ -286,10 +280,17 @@ sub _read {
# Process incoming data
warn "-- Client <<< Server (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG;
$tx->client_read($chunk);
if ($tx->is_finished) { $self->_handle($id) }
if ($tx->is_finished) { $self->_finish($id) }
elsif ($c->{tx}->is_writing) { $self->_write($id) }
}

sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($c->{nb}, $new, delete $c->{cb});
}

sub _remove {
my ($self, $id, $close) = @_;

Expand All @@ -307,13 +308,6 @@ sub _remove {
unless uc $tx->req->method eq 'CONNECT';
}

sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($c->{nb}, $new, delete $c->{cb});
}

sub _start {
my ($self, $nb, $tx, $cb) = @_;

Expand Down Expand Up @@ -361,7 +355,7 @@ sub _write {
delete $c->{writing};
warn "-- Client >>> Server (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG;
my $stream = $self->_loop($c->{nb})->stream($id)->write($chunk);
$self->_handle($id) if $tx->is_finished;
$self->_finish($id) if $tx->is_finished;

# Continue writing
return unless $tx->is_writing;
Expand Down

0 comments on commit f7ea445

Please sign in to comment.