Skip to content

Commit

Permalink
move _write to channel
Browse files Browse the repository at this point in the history
  • Loading branch information
jberger committed Dec 14, 2015
1 parent c267384 commit 8a6a0be
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
23 changes: 23 additions & 0 deletions lib/Mojo/Channel/HTTP/Client.pm
Expand Up @@ -2,7 +2,30 @@ package Mojo::Channel::HTTP::Client;

use Mojo::Base 'Mojo::EventEmitter';

use Mojo::Util 'term_escape';
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MOJO_USERAGENT_DEBUG};

has [qw/ioloop tx/];

sub write {
my ($self, $id) = @_;
return unless my $tx = $self->tx;
return if !$tx->is_writing || $self->{writing}++;
my $chunk = $tx->client_write;
delete $self->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $self->ioloop->stream($id)->write($chunk);
$self->emit(finished => $id) if $tx->is_finished;

# Continue writing
return unless $tx->is_writing;
weaken $self;
$stream->write('' => sub { $self->write($id) if $self });
}

sub _url { shift->req->url->to_abs }

1;

36 changes: 9 additions & 27 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -165,9 +165,9 @@ sub _connected {
$tx->remote_address($handle->peerhost)->remote_port($handle->peerport);

# Start writing
weaken $self;
$tx->on(resume => sub { $self->_write($id) });
$self->_write($id);
weaken $c;
$tx->on(resume => sub { $c->write($id) });
$c->write($id);
}

sub _connection {
Expand Down Expand Up @@ -238,8 +238,8 @@ sub _finish {

# Upgrade connection to WebSocket
if (my $new = $self->transactor->upgrade($old)) {
weaken $self;
$new->on(resume => sub { $self->_write($id) });
weaken $c;
$new->on(resume => sub { $c->write($id) });
$c->{cb}($self, $c->tx($new)->tx);
return $new->client_read($old->res->content->leftovers);
}
Expand All @@ -260,7 +260,7 @@ sub _read {
warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
$tx->client_read($chunk);
if ($tx->is_finished) { $self->_finish($id) }
elsif ($tx->is_writing) { $self->_write($id) }
elsif ($tx->is_writing) { $c->write($id) }
}

sub _redirect {
Expand Down Expand Up @@ -307,37 +307,19 @@ sub _start {

# Connect and add request timeout if necessary
my $id = $self->emit(start => $tx)->_connection($loop, $tx, $cb);
my $c = $self->{connections}{$id};
weaken $self;
if (my $timeout = $self->request_timeout) {
weaken $self;
my $c = $self->{connections}{$id};
$c->{timeout} = $c->ioloop
->timer($timeout => sub { $self->_error($id, 'Request timeout') });
}
$c->on(finish => sub { $self->_finish($id) });

return $id;
}

sub _url { shift->req->url->to_abs }

sub _write {
my ($self, $id) = @_;

# Get and write chunk
return unless my $c = $self->{connections}{$id};
return unless my $tx = $c->tx;
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->client_write;
delete $c->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $c->ioloop->stream($id)->write($chunk);
$self->_finish($id) if $tx->is_finished;

# Continue writing
return unless $tx->is_writing;
weaken $self;
$stream->write('' => sub { $self->_write($id) });
}

1;

=encoding utf8
Expand Down

0 comments on commit 8a6a0be

Please sign in to comment.