Skip to content

Commit

Permalink
better congestion control for Mojo::IOLoop::Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Jan 31, 2016
1 parent fad35ff commit a8a640e
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 11 deletions.
4 changes: 3 additions & 1 deletion Changes
@@ -1,8 +1,10 @@

6.43 2016-01-31
6.43 2016-02-01
- Removed client_close and server_close methods from Mojo::Transaction.
- Added high_water_mark attribute to Mojo::IOLoop::Stream.
- Added closed method to Mojo::Transaction.
- Added parse_message method to Mojo::Transaction::WebSocket.
- Added congestion event to Mojo::IOLoop::Stream.
- Improved a few examples to avoid timing attacks.
- Fixed timing bug in Mojo::Server::Daemon.

Expand Down
24 changes: 21 additions & 3 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -6,6 +6,7 @@ use Mojo::IOLoop;
use Mojo::Util;
use Scalar::Util 'weaken';

has high_water_mark => 16777216;

This comment has been minimized.

Copy link
@jhthorsen

jhthorsen Jan 31, 2016

Member

Why is the default exactly this value?

This comment has been minimized.

Copy link
@kraih

kraih Jan 31, 2016

Author Member

It's an arbitrary choice.

This comment has been minimized.

Copy link
@jhthorsen

jhthorsen Jan 31, 2016

Member

Ok.

has reactor => sub { Mojo::IOLoop->singleton->reactor };

sub DESTROY { Mojo::Util::_global_destruction() or shift->close }
Expand Down Expand Up @@ -80,10 +81,10 @@ sub write {
my ($self, $chunk, $cb) = @_;

$self->{buffer} .= $chunk;
if ($cb) { $self->once(drain => $cb) }
elsif ($self->{buffer} eq '') { return $self }
$self->once(drain => $cb) if $cb;
$self->emit('congestion') if length $self->{buffer} >= $self->high_water_mark;
$self->reactor->watch($self->{handle}, !$self->{paused}, 1)
if $self->{handle};
if $self->{handle} && ($cb || $self->{buffer} ne '');

return $self;
}
Expand Down Expand Up @@ -172,6 +173,15 @@ emit the following new ones.
Emitted if the stream gets closed.
=head2 congestion
$stream->on(congestion => sub {
my $stream = shift;
...
});
Emitted if the data waiting to be written reaches L</"high_water_mark">.
=head2 drain
$stream->on(drain => sub {
Expand Down Expand Up @@ -222,6 +232,14 @@ Emitted if new data has been written to the stream.
L<Mojo::IOLoop::Stream> implements the following attributes.
=head2 high_water_mark
my $size = $stream->high_water_mark;
$stream = $stream->high_water_mark(1024);
Maximum size in bytes of data waiting to be written before the L</"congestion">
event will be emitted, defaults to C<16777216> (16MB).
=head2 reactor
my $reactor = $stream->reactor;
Expand Down
10 changes: 4 additions & 6 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -116,6 +116,8 @@ sub _close {
delete $self->{connections}{$id};
}

sub _debug { $_[0]->app->log->debug($_[2]) if $_[0]->{connections}{$_[1]}{tx} }

sub _finish {
my ($self, $id) = @_;

Expand Down Expand Up @@ -185,15 +187,11 @@ sub _listen {
$stream->timeout($self->inactivity_timeout);

$stream->on(close => sub { $self && $self->_close($id) });
$stream->on(congestion => sub { $self->_debug($id, 'Write congestion') });
$stream->on(error =>
sub { $self && $self->app->log->error(pop) && $self->_close($id) });
$stream->on(read => sub { $self->_read($id => pop) });
$stream->on(
timeout => sub {
$self->app->log->debug('Inactivity timeout')
if $self->{connections}{$id}{tx};
}
);
$stream->on(timeout => sub { $self->_debug($id, 'Inactivity timeout') });
}
);

Expand Down
2 changes: 1 addition & 1 deletion lib/Mojolicious/Controller.pm
Expand Up @@ -811,7 +811,7 @@ establish the WebSocket connection.
use Mojo::WebSocket 'WS_PING';
$c->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
# Make sure previous message has been written before continuing
# Make sure the first message has been written before continuing
$c->send('First message!' => sub {
my $c = shift;
$c->send('Second message!');
Expand Down
15 changes: 15 additions & 0 deletions lib/Mojolicious/Guides/FAQ.pod
Expand Up @@ -261,6 +261,21 @@ will force the affected worker to be restarted after a timeout. This timeout
defaults to C<20> seconds and can be extended with the attribute
L<Mojo::Server::Prefork/"heartbeat_timeout"> if your application requires it.

=head2 What does "Write congestion" mean?

All built-in web servers are based on an event-loop, and can therefore only
write data when the event-loop is in control. So if you are writing a lot of
data at once to your WebSocket, it all has to be buffered, which can be very
inefficient. So you will be notified once the data waiting to be written reaches
a certain size, which defaults to 16MB, and can be changed for each connection
individually with L<Mojo::IOLoop::Stream/"high_water_mark">. To avoid this, all
you have to do, is to wait for all data to be written, before sending more.

$c->send('First message!' => sub {
my $c = shift;
$c->send('Second message!');
});

=head1 MORE

You can continue with L<Mojolicious::Guides> now or take a look at the
Expand Down
30 changes: 30 additions & 0 deletions t/mojo/ioloop.t
Expand Up @@ -256,6 +256,36 @@ ok length($server) > length($server_after), 'stream has been resumed';
is $client, $client_after, 'stream was writable while paused';
is $client, 'works!', 'full message has been written';

# Congestion
my $congestion;
$client = '';
$id = Mojo::IOLoop->server(
{address => '127.0.0.1'} => sub {
my ($loop, $stream) = @_;
$stream->write('a');
$stream->high_water_mark(3);
$stream->on(congestion => sub { $congestion++ });
$stream->write('b');
$stream->write('c');
$stream->write(
'd' => sub {
shift->write('e' => sub { shift->close });
}
);
}
);
$port = Mojo::IOLoop->acceptor($id)->port;
Mojo::IOLoop->client(
{port => $port} => sub {
my ($loop, $err, $stream) = @_;
$stream->on(read => sub { $client .= pop });
$stream->on(close => sub { Mojo::IOLoop->stop });
}
);
Mojo::IOLoop->start;
is $client, 'abcde', 'full message has been written';
is $congestion, 2, 'congestion event has been emitted twice';

# Graceful shutdown
$err = '';
$loop = Mojo::IOLoop->new;
Expand Down
18 changes: 18 additions & 0 deletions t/mojo/websocket.t
Expand Up @@ -115,6 +115,12 @@ websocket '/timeout' => sub {
->on(finish => sub { shift->stash->{finished}++ });
};

websocket '/congestion' => sub {
my $c = shift;
Mojo::IOLoop->stream($c->tx->connection)->high_water_mark(5);
$c->send('works!')->finish;
};

# URL for WebSocket
my $ua = app->ua;
my $res = $ua->get('/link')->success;
Expand Down Expand Up @@ -392,6 +398,18 @@ is $stash->{finished}, 1, 'finish event has been emitted once';
like $log, qr/Inactivity timeout/, 'right log message';
app->log->unsubscribe(message => $msg);

# Congestion
$log = '';
$msg = app->log->on(message => sub { $log .= pop });
$ua->websocket(
'/congestion' => sub {
pop->on(finish => sub { Mojo::IOLoop->stop });
}
);
Mojo::IOLoop->start;
like $log, qr/Write congestion/, 'right log message';
app->log->unsubscribe(message => $msg);

# Ping/pong
my $pong;
$ua->websocket(
Expand Down

0 comments on commit a8a640e

Please sign in to comment.