Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
move lots of client functionality over to the channel
  • Loading branch information
jberger committed Jan 9, 2016
1 parent 62ad0a0 commit 04f4a60
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 26 deletions.
108 changes: 108 additions & 0 deletions lib/Mojo/Channel/HTTP.pm
@@ -0,0 +1,108 @@
package Mojo::Channel::HTTP;

use Mojo::Base -base;

has 'tx';

sub close { shift->tx->close }

sub is_server { 0 }

sub write {
my ($self) = @_;
my $server = $self->is_server;
my $tx = $self->tx;

# Client starts writing right away
$tx->{state} ||= 'write' unless $server;
return '' unless $tx->{state} eq 'write';

# Nothing written yet
$tx->{$_} ||= 0 for qw(offset write);
my $msg = $server ? $tx->res : $tx->req;
@$tx{qw(http_state write)} = ('start_line', $msg->start_line_size)
unless $tx->{http_state};

# Start-line
my $chunk = '';
$chunk .= $self->_start_line($msg) if $tx->{http_state} eq 'start_line';

# Headers
$chunk .= $self->_headers($msg, $server) if $tx->{http_state} eq 'headers';

# Body
$chunk .= $self->_body($msg, $server) if $tx->{http_state} eq 'body';

return $chunk;
}

sub _body {
my ($self, $msg, $finish) = @_;
my $tx = $self->tx;

# Prepare body chunk
my $buffer = $msg->get_body_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} = $msg->content->is_dynamic ? 1 : ($tx->{write} - $written);
$tx->{offset} += $written;
if (defined $buffer) { delete $tx->{delay} }

# Delayed
else {
if (delete $tx->{delay}) { $tx->{state} = 'paused' }
else { $tx->{delay} = 1 }
}

# Finished
$tx->{state} = $finish ? 'finished' : 'read'
if $tx->{write} <= 0 || defined $buffer && $buffer eq '';

return defined $buffer ? $buffer : '';
}

sub _headers {
my ($self, $msg, $head) = @_;
my $tx = $self->tx;

# Prepare header chunk
my $buffer = $msg->get_header_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} -= $written;
$tx->{offset} += $written;

# Switch to body
if ($tx->{write} <= 0) {
$tx->{offset} = 0;

# Response without body
if ($head && $self->is_empty) { $tx->{state} = 'finished' }

# Body
else {
$tx->{http_state} = 'body';
$tx->{write} = $msg->content->is_dynamic ? 1 : $msg->body_size;
}
}

return $buffer;
}

sub _start_line {
my ($self, $msg) = @_;
my $tx = $self->tx;

# Prepare start-line chunk
my $buffer = $msg->get_start_line_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} -= $written;
$tx->{offset} += $written;

# Switch to headers
@$tx{qw(http_state write offset)} = ('headers', $msg->header_size, 0)
if $tx->{write} <= 0;

return $buffer;
}

1;

21 changes: 19 additions & 2 deletions lib/Mojo/Channel/HTTP/Client.pm
@@ -1,8 +1,25 @@
package Mojo::Channel::HTTP::Client;

use Mojo::Base -base;
use Mojo::Base 'Mojo::Channel::HTTP';

has [qw/ioloop tx/];
has 'ioloop';

sub close {
my ($self, $close) = @_;

# Premature connection close
my $res = $self->tx->res->finish;
if ($close && !$res->code && !$res->error) {
$res->error({message => 'Premature connection close'});
}

# 4xx/5xx
elsif ($res->is_status_class(400) || $res->is_status_class(500)) {
$res->error({message => $res->message, code => $res->code});
}

$self->SUPER::close;
}

sub read {
my ($self, $chunk) = @_;
Expand Down
8 changes: 8 additions & 0 deletions lib/Mojo/Channel/HTTP/Server.pm
@@ -0,0 +1,8 @@
package Mojo::Channel::HTTP::Server;

use Mojo::Base 'Mojo::Channel::HTTP';

sub is_server { 1 }

1;

18 changes: 15 additions & 3 deletions lib/Mojo/Channel/WebSocket.pm
@@ -1,21 +1,33 @@
package Mojo::Channel::WebSocket;

use Mojo::Base -base;
use Mojo::Base 'Mojo::Channel::HTTP';

has [qw/ioloop tx/];

sub read {
my ($self, $chunk) = @_;
my $tx = $self->tx;

$self->{read} .= $chunk // '';
while (my $frame = $tx->parse_frame(\$self->{read})) {
$tx->{read} .= $chunk // '';
while (my $frame = $tx->parse_frame(\$tx->{read})) {
$tx->finish(1009) and last unless ref $frame;
$tx->emit(frame => $frame);
}

$tx->emit('resume');
}

sub write {
my $self = shift;
my $tx = $self->tx;

unless (length($tx->{write} // '')) {
$tx->{state} = $tx->{finished} ? 'finished' : 'read';
$tx->emit('drain');
}

return delete $tx->{write} // '';
}

1;

19 changes: 2 additions & 17 deletions lib/Mojo/Transaction.pm
Expand Up @@ -10,26 +10,11 @@ has [
has req => sub { Mojo::Message::Request->new };
has res => sub { Mojo::Message::Response->new };

sub client_close {
my ($self, $close) = @_;

# Premature connection close
my $res = $self->res->finish;
if ($close && !$res->code && !$res->error) {
$res->error({message => 'Premature connection close'});
}

# 4xx/5xx
elsif ($res->is_status_class(400) || $res->is_status_class(500)) {
$res->error({message => $res->message, code => $res->code});
}

$self->server_close;
}

sub client_read { croak 'Method "client_read" not implemented by subclass' }
sub client_write { croak 'Method "client_write" not implemented by subclass' }

sub close { shift->_state(qw(finished finish)) }

sub connection {
my $self = shift;
return $self->emit(connection => $self->{connection} = shift) if @_;
Expand Down
2 changes: 0 additions & 2 deletions lib/Mojo/Transaction/HTTP.pm
Expand Up @@ -3,8 +3,6 @@ use Mojo::Base 'Mojo::Transaction';

has [qw(next previous)];

sub client_write { shift->_write(0) }

sub is_empty { !!(uc $_[0]->req->method eq 'HEAD' || $_[0]->res->is_empty) }

sub keep_alive {
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -230,7 +230,7 @@ sub _finish {
$c->ioloop->remove($c->{timeout}) if $c->{timeout};

return $self->_reuse($id, $close) unless my $old = $c->tx;
$old->client_close($close);
$c->close($close);

# Finish WebSocket
return $self->_remove($id) if $old->is_websocket;
Expand Down Expand Up @@ -331,7 +331,7 @@ sub _write {
return unless my $c = $self->{connections}{$id};
return unless my $tx = $c->tx;
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->client_write;
my $chunk = $c->write;
delete $c->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $c->ioloop->stream($id)->write($chunk);
Expand Down

0 comments on commit 04f4a60

Please sign in to comment.