Skip to content

Commit

Permalink
move the _write methods to the channel classes
Browse files Browse the repository at this point in the history
  • Loading branch information
jberger committed Jan 9, 2016
1 parent 4b74d88 commit 8082f45
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 135 deletions.
96 changes: 96 additions & 0 deletions lib/Mojo/Channel/HTTP.pm
@@ -1,4 +1,100 @@
package Mojo::Channel::HTTP;
use Mojo::Base 'Mojo::Channel';

sub is_server { 0 }

sub write {
my $self = shift;
my $server = $self->is_server;
my $tx = $self->{tx};

# Client starts writing right away
$tx->{state} ||= 'write' unless $server;
return '' unless $tx->{state} eq 'write';

# Nothing written yet
$tx->{$_} ||= 0 for qw(offset write);
my $msg = $server ? $tx->res : $tx->req;
@$tx{qw(http_state write)} = ('start_line', $msg->start_line_size)
unless $tx->{http_state};

# Start-line
my $chunk = '';
$chunk .= $self->_start_line($msg) if $tx->{http_state} eq 'start_line';

# Headers
$chunk .= $self->_headers($msg, $server) if $tx->{http_state} eq 'headers';

# Body
$chunk .= $self->_body($msg, $server) if $tx->{http_state} eq 'body';

return $chunk;
}

sub _body {
my ($self, $msg, $finish) = @_;
my $tx = $self->{tx};

# Prepare body chunk
my $buffer = $msg->get_body_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} = $msg->content->is_dynamic ? 1 : ($tx->{write} - $written);
$tx->{offset} += $written;
if (defined $buffer) { delete $self->{delay} }

# Delayed
elsif (delete $tx->{delay}) { $tx->{state} = 'paused' }
else { $tx->{delay} = 1 }

# Finished
$tx->{state} = $finish ? 'finished' : 'read'
if $tx->{write} <= 0 || defined $buffer && $buffer eq '';

return defined $buffer ? $buffer : '';
}

sub _headers {
my ($self, $msg, $head) = @_;
my $tx = $self->{tx};

# Prepare header chunk
my $buffer = $msg->get_header_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} -= $written;
$tx->{offset} += $written;

# Switch to body
if ($tx->{write} <= 0) {
$tx->{offset} = 0;

# Response without body
if ($head && $tx->is_empty) { $tx->{state} = 'finished' }

# Body
else {
$tx->{http_state} = 'body';
$tx->{write} = $msg->content->is_dynamic ? 1 : $msg->body_size;
}
}

return $buffer;
}

sub _start_line {
my ($self, $msg) = @_;
my $tx = $self->{tx};

# Prepare start-line chunk
my $buffer = $msg->get_start_line_chunk($tx->{offset});
my $written = defined $buffer ? length $buffer : 0;
$tx->{write} -= $written;
$tx->{offset} += $written;

# Switch to headers
@$tx{qw(http_state write offset)} = ('headers', $msg->header_size, 0)
if $tx->{write} <= 0;

return $buffer;
}

1;
2 changes: 2 additions & 0 deletions lib/Mojo/Channel/HTTP/Server.pm
@@ -1,6 +1,8 @@
package Mojo::Channel::HTTP::Server;
use Mojo::Base 'Mojo::Channel::HTTP';

sub is_server { 1 }

sub read {
my ($self, $chunk) = @_;

Expand Down
14 changes: 14 additions & 0 deletions lib/Mojo/Channel/WebSocket.pm
Expand Up @@ -14,4 +14,18 @@ sub read {
$tx->emit('resume');
}


sub write {
my $self = shift;
my $tx = $self->{tx};

unless (length($tx->{write} // '')) {
$tx->{state} = $tx->{finished} ? 'finished' : 'read';
$tx->emit('drain');
}

return delete $tx->{write} // '';
}


1;
2 changes: 1 addition & 1 deletion lib/Mojo/Server/Daemon.pm
Expand Up @@ -234,7 +234,7 @@ sub _write {
my $c = $self->{connections}{$id};
return unless my $tx = $c->{tx};
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->server_write;
my $chunk = $c->write;
delete $c->{writing};
warn term_escape "-- Server >>> Client (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $self->ioloop->stream($id)->write($chunk);
Expand Down
21 changes: 0 additions & 21 deletions lib/Mojo/Transaction.pm
Expand Up @@ -27,8 +27,6 @@ sub client_close {
$self->server_close;
}

sub client_write { croak 'Method "client_write" not implemented by subclass' }

sub connection {
my $self = shift;
return $self->emit(connection => $self->{connection} = shift) if @_;
Expand Down Expand Up @@ -58,8 +56,6 @@ sub remote_address {
sub resume { shift->_state(qw(write resume)) }
sub server_close { shift->_state(qw(finished finish)) }

sub server_write { croak 'Method "server_write" not implemented by subclass' }

sub success { $_[0]->error ? undef : $_[0]->res }

sub _state {
Expand All @@ -81,9 +77,6 @@ Mojo::Transaction - Transaction base class
package Mojo::Transaction::MyTransaction;
use Mojo::Base 'Mojo::Transaction';
sub client_write {...}
sub server_write {...}
=head1 DESCRIPTION
L<Mojo::Transaction> is an abstract base class for transactions, like
Expand Down Expand Up @@ -187,13 +180,6 @@ implements the following new ones.
Transaction closed client-side, no actual connection close is assumed by
default, used to implement user agents such as L<Mojo::UserAgent>.
=head2 client_write
my $bytes = $tx->client_write;
Write data client-side, used to implement user agents such as
L<Mojo::UserAgent>. Meant to be overloaded in a subclass.
=head2 connection
my $id = $tx->connection;
Expand Down Expand Up @@ -257,13 +243,6 @@ proxy.
Transaction closed server-side, used to implement web servers such as
L<Mojo::Server::Daemon>.
=head2 server_write
my $bytes = $tx->server_write;
Write data server-side, used to implement web servers such as
L<Mojo::Server::Daemon>. Meant to be overloaded in a subclass.
=head2 success
my $res = $tx->success;
Expand Down
93 changes: 0 additions & 93 deletions lib/Mojo/Transaction/HTTP.pm
Expand Up @@ -3,8 +3,6 @@ use Mojo::Base 'Mojo::Transaction';

has [qw(next previous)];

sub client_write { shift->_write(0) }

sub is_empty { !!(uc $_[0]->req->method eq 'HEAD' || $_[0]->res->is_empty) }

sub keep_alive {
Expand Down Expand Up @@ -32,97 +30,6 @@ sub redirects {
return \@redirects;
}

sub server_write { shift->_write(1) }

sub _body {
my ($self, $msg, $finish) = @_;

# Prepare body chunk
my $buffer = $msg->get_body_chunk($self->{offset});
my $written = defined $buffer ? length $buffer : 0;
$self->{write} = $msg->content->is_dynamic ? 1 : ($self->{write} - $written);
$self->{offset} += $written;
if (defined $buffer) { delete $self->{delay} }

# Delayed
elsif (delete $self->{delay}) { $self->{state} = 'paused' }
else { $self->{delay} = 1 }

# Finished
$self->{state} = $finish ? 'finished' : 'read'
if $self->{write} <= 0 || defined $buffer && $buffer eq '';

return defined $buffer ? $buffer : '';
}

sub _headers {
my ($self, $msg, $head) = @_;

# Prepare header chunk
my $buffer = $msg->get_header_chunk($self->{offset});
my $written = defined $buffer ? length $buffer : 0;
$self->{write} -= $written;
$self->{offset} += $written;

# Switch to body
if ($self->{write} <= 0) {
$self->{offset} = 0;

# Response without body
if ($head && $self->is_empty) { $self->{state} = 'finished' }

# Body
else {
$self->{http_state} = 'body';
$self->{write} = $msg->content->is_dynamic ? 1 : $msg->body_size;
}
}

return $buffer;
}

sub _start_line {
my ($self, $msg) = @_;

# Prepare start-line chunk
my $buffer = $msg->get_start_line_chunk($self->{offset});
my $written = defined $buffer ? length $buffer : 0;
$self->{write} -= $written;
$self->{offset} += $written;

# Switch to headers
@$self{qw(http_state write offset)} = ('headers', $msg->header_size, 0)
if $self->{write} <= 0;

return $buffer;
}

sub _write {
my ($self, $server) = @_;

# Client starts writing right away
$self->{state} ||= 'write' unless $server;
return '' unless $self->{state} eq 'write';

# Nothing written yet
$self->{$_} ||= 0 for qw(offset write);
my $msg = $server ? $self->res : $self->req;
@$self{qw(http_state write)} = ('start_line', $msg->start_line_size)
unless $self->{http_state};

# Start-line
my $chunk = '';
$chunk .= $self->_start_line($msg) if $self->{http_state} eq 'start_line';

# Headers
$chunk .= $self->_headers($msg, $server) if $self->{http_state} eq 'headers';

# Body
$chunk .= $self->_body($msg, $server) if $self->{http_state} eq 'body';

return $chunk;
}

1;

=encoding utf8
Expand Down
13 changes: 0 additions & 13 deletions lib/Mojo/Transaction/WebSocket.pm
Expand Up @@ -95,8 +95,6 @@ sub build_message {
return $self->build_frame(@$frame);
}

sub client_write { shift->server_write(@_) }

sub connection { shift->handshake->connection }

sub finish {
Expand Down Expand Up @@ -209,17 +207,6 @@ sub server_close {

sub server_open { shift->{open}++ }

sub server_write {
my $self = shift;

unless (length($self->{write} // '')) {
$self->{state} = $self->{finished} ? 'finished' : 'read';
$self->emit('drain');
}

return delete $self->{write} // '';
}

sub with_compression {
my $self = shift;

Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/UserAgent.pm
Expand Up @@ -330,7 +330,7 @@ sub _write {
my $c = $self->{connections}{$id};
return unless my $tx = $c->{tx};
return if !$tx->is_writing || $c->{writing}++;
my $chunk = $tx->client_write;
my $chunk = $c->write;
delete $c->{writing};
warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $c->{ioloop}->stream($id)->write($chunk);
Expand Down
6 changes: 0 additions & 6 deletions t/mojo/transactor.t
Expand Up @@ -919,10 +919,4 @@ is $tx->req->body, '', 'no content';
is $tx->res->code, undef, 'no status';
is $tx->res->headers->location, undef, 'no "Location" value';

# Abstract methods
eval { Mojo::Transaction->client_write };
like $@, qr/Method "client_write" not implemented by subclass/, 'right error';
eval { Mojo::Transaction->server_write };
like $@, qr/Method "server_write" not implemented by subclass/, 'right error';

done_testing();

0 comments on commit 8082f45

Please sign in to comment.