Skip to content

Commit

Permalink
port to Channel object for UserAgent
Browse files Browse the repository at this point in the history
this redoes the work prior to backporting relevant changes from an
earlier branch

still have to port to using the events
  • Loading branch information
jberger committed Dec 20, 2015
1 parent dfb031b commit 1adc855
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
42 changes: 42 additions & 0 deletions lib/Mojo/Channel/HTTP/Client.pm
@@ -0,0 +1,42 @@
package Mojo::Channel::HTTP::Client;

use Mojo::Base 'Mojo::EventEmitter';

use Mojo::Util 'term_escape';
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MOJO_USERAGENT_DEBUG};

has [qw/ioloop tx/];

sub read {
my ($self, $id, $chunk) = @_;
return unless my $tx = $self->tx;

# Process incoming data
warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
$tx->client_read($chunk);
if ($tx->is_finished) { $self->emit(finished => $id) }
elsif ($tx->is_writing) { $self->write($id) }
}

sub write {
my ($self, $id) = @_;
return unless my $tx = $self->tx;
return if !$tx->is_writing || $self->{writing}++;
my $chunk = $tx->client_write;
delete $self->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $self->ioloop->stream($id)->write($chunk);
$self->emit(finished => $id) if $tx->is_finished;

# Continue writing
return unless $tx->is_writing;
weaken $self;
$stream->write('' => sub { $self->write($id) if $self });
}

sub _url { shift->req->url->to_abs }

1;

48 changes: 28 additions & 20 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -3,6 +3,7 @@ use Mojo::Base 'Mojo::EventEmitter';

# "Fry: Since when is the Internet about robbing people of their privacy?
# Bender: August 6, 1991."
use Mojo::Channel::HTTP::Client;
use Mojo::IOLoop;
use Mojo::Util qw(monkey_patch term_escape);
use Mojo::UserAgent::CookieJar;
Expand Down Expand Up @@ -76,7 +77,9 @@ sub _cleanup {
}

sub _connect {
my ($self, $loop, $peer, $tx, $handle, $cb) = @_;
my ($self, $c, $peer, $handle, $cb) = @_;
my $loop = $c->ioloop;
my $tx = $c->tx;

my $t = $self->transactor;
my ($proto, $host, $port) = $peer ? $t->peer($tx) : $t->endpoint($tx);
Expand Down Expand Up @@ -140,9 +143,10 @@ sub _connect_proxy {
# TLS upgrade
my $handle = $loop->stream($id)->steal_handle;
$self->_remove($id);
$id = $self->_connect($loop, 0, $old, $handle,
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, ioloop => $loop, tx => $old);
$id = $self->_connect($c, 0, $handle,
sub { shift->_start($loop, $old->connection($id), $cb) });
$self->{connections}{$id} = {cb => $cb, ioloop => $loop, tx => $old};
$self->{connections}{$id} = $c;
}
);
}
Expand All @@ -152,10 +156,10 @@ sub _connected {

# Inactivity timeout
my $c = $self->{connections}{$id};
my $stream = $c->{ioloop}->stream($id)->timeout($self->inactivity_timeout);
my $stream = $c->ioloop->stream($id)->timeout($self->inactivity_timeout);

# Store connection information in transaction
my $tx = $c->{tx}->connection($id);
my $tx = $c->tx->connection($id);
my $handle = $stream->handle;
$tx->local_address($handle->sockhost)->local_port($handle->sockport);
$tx->remote_address($handle->peerhost)->remote_port($handle->peerport);
Expand Down Expand Up @@ -184,9 +188,10 @@ sub _connection {
if (my $id = $self->_connect_proxy($loop, $tx, $cb)) { return $id }

# Connect
$id = $self->_connect($loop, 1, $tx, undef, \&_connected);
my $c = Mojo::Channel::HTTP::Client->new(cb => $cb, ioloop => $loop, tx => $tx);
$id = $self->_connect($c, 1, undef, \&_connected);
warn "-- Connect $id ($proto://$host:$port)\n" if DEBUG;
$self->{connections}{$id} = {cb => $cb, ioloop => $loop, tx => $tx};
$self->{connections}{$id} = $c;

return $id;
}
Expand All @@ -210,7 +215,8 @@ sub _dequeue {

sub _error {
my ($self, $id, $err) = @_;
my $tx = $self->{connections}{$id}{tx};
my $c = $self->{connections}{$id};
my $tx = $c ? $c->tx : undef;
$tx->res->error({message => $err}) if $tx;
$self->_finish($id, 1);
}
Expand All @@ -220,9 +226,9 @@ sub _finish {

# Remove request timeout
return unless my $c = $self->{connections}{$id};
$c->{ioloop}->remove($c->{timeout}) if $c->{timeout};
$c->ioloop->remove($c->{timeout}) if $c->{timeout};

return $self->_reuse($id, $close) unless my $old = $c->{tx};
return $self->_reuse($id, $close) unless my $old = $c->tx;
$old->client_close($close);

# Finish WebSocket
Expand All @@ -234,7 +240,7 @@ sub _finish {
if (my $new = $self->transactor->upgrade($old)) {
weaken $self;
$new->on(resume => sub { $self->_write($id) });
$c->{cb}($self, $c->{tx} = $new);
$c->{cb}($self, $c->tx($new)->tx);
return $new->client_read($old->res->content->leftovers);
}

Expand All @@ -245,9 +251,10 @@ sub _finish {

sub _read {
my ($self, $id, $chunk) = @_;
my $c = $self->{connections}{$id};

# Corrupted connection
return $self->_remove($id) unless my $tx = $self->{connections}{$id}{tx};
return $self->_remove($id) unless my $tx = $c->tx;

# Process incoming data
warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
Expand All @@ -260,28 +267,29 @@ sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($c->{ioloop}, $new, delete $c->{cb});
return $self->_start($c->ioloop, $new, delete $c->{cb});
}

sub _remove {
my ($self, $id) = @_;
my $c = delete $self->{connections}{$id};
$self->_dequeue($c->{ioloop}, $id);
$c->{ioloop}->remove($id);
$self->_dequeue($c->ioloop, $id);
$c->ioloop->remove($id);
}

sub _reuse {
my ($self, $id, $close) = @_;

# Connection close
my $c = $self->{connections}{$id};
my $tx = delete $c->{tx};
my $tx = $c->tx;
$c->tx(undef);
my $max = $self->max_connections;
return $self->_remove($id)
if $close || !$tx || !$max || !$tx->keep_alive || $tx->error;

# Keep connection alive
my $queue = $self->{queue}{$c->{ioloop}} ||= [];
my $queue = $self->{queue}{$c->ioloop} ||= [];
$self->_remove(shift(@$queue)->[1]) while @$queue && @$queue >= $max;
push @$queue, [join(':', $self->transactor->endpoint($tx)), $id];
}
Expand Down Expand Up @@ -316,13 +324,13 @@ sub _write {
my ($self, $id) = @_;

# Get and write chunk
my $c = $self->{connections}{$id};
return unless my $tx = $c->{tx};
return unless my $c = $self->{connections}{$id};
return unless my $tx = $c->tx;
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->client_write;
delete $c->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $c->{ioloop}->stream($id)->write($chunk);
my $stream = $c->ioloop->stream($id)->write($chunk);
$self->_finish($id) if $tx->is_finished;

# Continue writing
Expand Down

0 comments on commit 1adc855

Please sign in to comment.