Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
made connection timeout handling more user-friendly
  • Loading branch information
kraih committed Dec 10, 2011
1 parent f6fed45 commit 9e09641
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 110 deletions.
13 changes: 10 additions & 3 deletions Changes
@@ -1,8 +1,15 @@
This file documents the revision history for Perl extension Mojolicious.

2.37 2011-12-09 00:00:00
2.37 2011-12-10 00:00:00
- Welcome to the Mojolicious core team Marcus Ramberg, Glen Hinkle
and Abhijit Menon-Sen.
- Removed cleanup_interval attribute from Mojo::IOLoop.
- Deprecated Mojo::IOLoop->timeout in favor of
Mojo::IOLoop::Stream->timeout.
- Added EXPERIMENTAL timeout event to Mojo::IOLoop::Stream.
- Added EXPERIMENTAL timeout attribute to Mojo::IOLoop::Stream.
- Changed default keep alive timeout of Mojo::UserAgent from 15 to 20
seconds.
- Improved documentation.
- Improved unicode tests.
- Fixed inline template double encoding bug.
Expand Down Expand Up @@ -864,7 +871,7 @@ This file documents the revision history for Perl extension Mojolicious.
- Improved Mojolicious::Plugin::I18n default lexicon handling. (yko)
- Improved a Mojo::IOLoop workaround.
- Moved all bundled static files to "lib/Mojolicious/public".
- Made Test::Mojo a little more user friendly.
- Made Test::Mojo a little more user-friendly.
- Fixed small CGI/FastCGI header generation bug.
- Fixed readonly handle support in Mojo::IOLoop.
- Fixed a Mojo::IOLoop resolver bug. (Charlie Brady)
Expand Down Expand Up @@ -1455,7 +1462,7 @@ This file documents the revision history for Perl extension Mojolicious.
- Renamed Mojolicious::Book to Mojolicious::Guides.
- Removed hot deployment support for Windows because of
incompatibilities between Active Perl and Strawberry Perl.
- Made process id and lock file defaults more userfriendly in
- Made process id and lock file defaults more user-friendly in
Mojo::Server::Daemon.
- Updated for Perl 5.12, not using the bytes pragma anymore.
- Fixed a bug where WebSocket servers could not directly start
Expand Down
80 changes: 25 additions & 55 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -21,7 +21,6 @@ has iowatcher => sub {
$watcher->on(error => sub { warn pop });
return $watcher;
};
has cleanup_interval => '0.025';
has [qw/lock unlock/];
has max_accepts => 0;
has max_connections => 1000;
Expand Down Expand Up @@ -137,13 +136,7 @@ EOF
}

# DEPRECATED in Leaf Fluttering In Wind!
sub connection_timeout {
warn <<EOF;
Mojo::IOLoop->connection_timeout is DEPRECATED in favor of
Mojo::IOLoop->timeout!
EOF
shift->timeout(@_);
}
*connection_timeout = \&timeout;

sub defer { shift->timer(0 => @_) }

Expand Down Expand Up @@ -305,8 +298,22 @@ sub start {
croak 'Mojo::IOLoop already running' if $self->{running}++;

# Mainloop
my $id =
$self->recurring($self->cleanup_interval => sub { shift->_cleanup });
my $id = $self->recurring(
'0.025' => sub {
my $self = shift;

# Manage connections
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {
$self->_drop($id)
if $c->{finish} && (!$c->{stream} || !$c->{stream}->is_writing);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;
}
);
$self->iowatcher->start;
$self->drop($id);

Expand Down Expand Up @@ -342,19 +349,22 @@ sub stream {
weaken $self;
$stream->on(close => sub { $self->{connections}->{$id}->{finish} = 1 });
$stream->on(error => sub { $self->{connections}->{$id}->{finish} = 1 });
$stream->on(read => sub { $self->{connections}->{$id}->{active} = time });
$stream->on(write => sub { $self->{connections}->{$id}->{active} = time });
$stream->resume;

return $id;
}

# DEPRECATED in Leaf Fluttering In Wind!
sub timeout {
warn <<EOF;
Mojo::IOLoop->timeout is DEPRECATED in favor of
Mojo::IOLoop::Stream->timeout!
EOF
my ($self, $id, $timeout) = @_;
$self = $self->singleton unless ref $self;
return unless my $c = $self->{connections}->{$id};
return $c->{timeout} unless defined $timeout;
$c->{timeout} = $timeout;
return unless my $stream = $self->stream($id);
return $stream->timeout unless defined $timeout;
$stream->timeout($timeout);
return $self;
}

Expand All @@ -377,29 +387,6 @@ EOF
return $stream->write($chunk, sub { $self->$cb($id) });
}

sub _cleanup {
my $self = shift;

# Manage connections
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {

# Connection needs to be finished
if ($c->{finish} && (!$c->{stream} || !$c->{stream}->is_writing)) {
$self->_drop($id);
next;
}

# Connection timeout
$self->_drop($id)
if (time - ($c->{active} || time)) >= ($c->{timeout} || 15);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;
}

sub _drop {
my ($self, $id) = @_;

Expand Down Expand Up @@ -553,14 +540,6 @@ Low level event watcher, usually a L<Mojo::IOWatcher> or
L<Mojo::IOWatcher::EV> object.
Note that this attribute is EXPERIMENTAL and might change without warning!
=head2 C<cleanup_interval>
my $interval = $loop->cleanup_interval;
$loop = $loop->cleanup_interval(1);
Connection cleanup interval in seconds, defaults to C<0.025>.
Note that this attribute is EXPERIMENTAL and might change without warning!
=head2 C<lock>
my $cb = $loop->lock;
Expand Down Expand Up @@ -778,15 +757,6 @@ and the loop can be restarted by running C<start> again.
Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
Note that this method is EXPERIMENTAL and might change without warning!
=head2 C<timeout>
my $timeout = Mojo::IOLoop->timeout($id);
my $timeout = $loop->timeout($id);
$loop = $loop->timeout($id => 45);
Maximum amount of time in seconds a connection can be inactive before getting
dropped, defaults to C<15>.
=head2 C<timer>
my $id = Mojo::IOLoop->timer(5 => sub {...});
Expand Down
39 changes: 37 additions & 2 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -3,13 +3,15 @@ use Mojo::Base 'Mojo::EventEmitter';

use Errno qw/EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK/;
use Scalar::Util 'weaken';
use Time::HiRes 'time';

use constant CHUNK_SIZE => $ENV{MOJO_CHUNK_SIZE} || 131072;

has iowatcher => sub {
require Mojo::IOLoop;
Mojo::IOLoop->singleton->iowatcher;
};
has timeout => 15;

# "And America has so many enemies.
# Iran, Iraq, China, Mordor, the hoochies that laid low Tiger Woods,
Expand All @@ -20,6 +22,7 @@ sub DESTROY {
return unless my $watcher = $self->{iowatcher};
return unless my $handle = $self->{handle};
$watcher->drop_handle($handle);
$watcher->drop_timer($self->{timer}) if $self->{timer};
close $handle;
$self->_close;
}
Expand Down Expand Up @@ -52,10 +55,21 @@ sub pause {
sub resume {
my $self = shift;

# Timeout
my $watcher = $self->iowatcher;
weaken $self;
$self->{timer} ||= $watcher->recurring(
'0.025' => sub {
return
unless $self && (time - ($self->{active} || time)) >= $self->timeout;
$self->emit_safe('timeout') unless $self->{timed}++;
$self->_close;
}
);

# Start streaming
unless ($self->{streaming}++) {
weaken $self;
return $self->iowatcher->watch(
return $watcher->watch(
$self->{handle},
sub { $self->_read },
sub { $self->_write }
Expand Down Expand Up @@ -119,6 +133,7 @@ sub _read {

# Handle read
$self->emit_safe(read => $buffer);
$self->{active} = time;
}

# "Oh, I'm in no condition to drive. Wait a minute.
Expand Down Expand Up @@ -146,6 +161,7 @@ sub _write {

# Remove written chunk from buffer
$self->emit_safe(write => substr($self->{buffer}, 0, $written, ''));
$self->{active} = time;
}

# Handle drain
Expand Down Expand Up @@ -228,6 +244,16 @@ Emitted safely if an error happens on the stream.
Emitted safely if new data arrives on the stream.
=head2 C<timeout>
$stream->on(timeout => sub {
my $stream = shift;
});
Emitted safely if the stream has been inactive for too long and will get
closed automatically.
Note that this event is EXPERIMENTAL and might change without warning!
=head2 C<write>
$stream->on(write => sub {
Expand All @@ -248,6 +274,15 @@ L<Mojo::IOLoop::Stream> implements the following attributes.
Low level event watcher, defaults to the C<iowatcher> attribute value of the
global L<Mojo::IOLoop> singleton.
=head2 C<timeout>
my $timeout = $stream->timeout;
$stream = $stream->timeout(45);
Maximum amount of time in seconds stream can be inactive before getting
closed automatically, defaults to C<15>.
Note that this attribute is EXPERIMENTAL and might change without warning!
=head1 METHODS
L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and
Expand Down
11 changes: 9 additions & 2 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -165,7 +165,7 @@ sub _finish {
if ($ws->res->code eq '101') {

# Upgrade connection timeout
$self->ioloop->timeout($id, $self->websocket_timeout);
$self->ioloop->stream($id)->timeout($self->websocket_timeout);

# Resume
weaken $self;
Expand Down Expand Up @@ -229,9 +229,16 @@ sub _listen {
$self->{connections}->{$id} = {tls => $tls};

# Keep alive timeout
$loop->timeout($id => $self->keep_alive_timeout);
$stream->timeout($self->keep_alive_timeout);

# Events
$stream->on(
timeout => sub {
my $c = $self->{connections}->{$id};
return unless $c->{transaction} || $c->{websocket};
$self->_error($id, 'Connection timeout.');
}
);
$stream->on(close => sub { $self->_close($id) });
$stream->on(error => sub { $self->_error($id, pop) });
$stream->on(read => sub { $self->_read($id, pop) });
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Transaction/HTTP.pm
Expand Up @@ -316,7 +316,7 @@ Emitted when a request is ready and needs to be handled.
Emitted when transaction gets upgraded to a L<Mojo::Transaction::WebSocket>
object.
Note that this event is EXPERIMENTAL and might change without warning!.
Note that this event is EXPERIMENTAL and might change without warning!
$tx->on(upgrade => sub {
my ($tx, $ws) = @_;
Expand Down

0 comments on commit 9e09641

Please sign in to comment.