Skip to content

Commit

Permalink
replaced one_tick method in Mojo::IOWatcher with start and stop methods
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Oct 11, 2011
1 parent 5714998 commit c3d472d
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 106 deletions.
2 changes: 2 additions & 0 deletions Changes
Expand Up @@ -4,6 +4,8 @@ This file documents the revision history for Perl extension Mojolicious.
- Code name "Leaf Fluttering In Wind", this is a major release.
- Increased Perl version requirement to 5.10.1.
- Renamed Mojo::IOLoop::EventEmitter to Mojo::EventEmitter.
- Replaced one_tick method in Mojo::IOWatcher with start and stop
methods.
- Added EXPERIMENTAL on_body attribute to Mojo::Content.
- Added EXPERIMENTAL contains method to Mojo::Path.
- Added EXPERIMENTAL auto_upgrade attribute to Mojo::Content::Single.
Expand Down
55 changes: 30 additions & 25 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -229,30 +229,9 @@ sub on_error { shift->_event(error => @_) }
sub on_read { shift->_event(read => @_) }

sub one_tick {
my ($self, $timeout) = @_;
$timeout //= $self->timeout;

# Housekeeping
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {

# Connection needs to be finished
if ($c->{finish} && (!$c->{stream} || $c->{stream}->is_finished)) {
$self->_drop($id);
next;
}

# Connection timeout
$self->_drop($id)
if (time - ($c->{active} || time)) >= ($c->{timeout} || 15);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;

# Watcher
$self->iowatcher->one_tick($timeout);
my $self = shift;
$self->timer(shift // $self->timeout => sub { shift->stop });
$self->start;
}

sub recurring {
Expand All @@ -278,7 +257,9 @@ sub start {
croak 'Mojo::IOLoop already running' if $self->{running}++;

# Mainloop
$self->one_tick while $self->{running};
my $id = $self->recurring(0 => sub { shift->_manage });
$self->iowatcher->start;
$self->drop($id);

return $self;
}
Expand All @@ -300,6 +281,7 @@ sub stop {
my $self = shift;
$self = $self->singleton unless ref $self;
delete $self->{running};
$self->iowatcher->stop;
}

sub test {
Expand Down Expand Up @@ -396,6 +378,29 @@ sub _listening {
$self->{listening} = 1;
}

sub _manage {
my $self = shift;

# Housekeeping
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {

# Connection needs to be finished
if ($c->{finish} && (!$c->{stream} || $c->{stream}->is_finished)) {
$self->_drop($id);
next;
}

# Connection timeout
$self->_drop($id)
if (time - ($c->{active} || time)) >= ($c->{timeout} || 15);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;
}

sub _not_listening {
my $self = shift;

Expand Down
39 changes: 27 additions & 12 deletions lib/Mojo/IOWatcher.pm
Expand Up @@ -60,20 +60,20 @@ sub not_writing {
}

# "This was such a pleasant St. Patrick's Day until Irish people showed up."
sub one_tick {
my ($self, $timeout) = @_;
sub _one_tick {
my $self = shift;

# I/O
my $poll = $self->_poll;
$poll->poll($timeout);
$poll->poll('0.025');
my $handles = $self->{handles};
$self->_sandbox('Read', $handles->{fileno $_}->{on_readable}, $_)
for $poll->handles(POLLIN | POLLHUP | POLLERR);
$self->_sandbox('Write', $handles->{fileno $_}->{on_writable}, $_)
for $poll->handles(POLLOUT);

# Wait for timeout
usleep 1000000 * $timeout unless keys %{$self->{handles}};
usleep 1000000 * '0.025' unless keys %{$self->{handles}};

# Timers
my $timers = $self->{timers} || {};
Expand Down Expand Up @@ -104,6 +104,14 @@ sub remove {
return $self;
}

sub start {
my $self = shift;
$self->{running}++;
$self->_one_tick while $self->{running};
}

sub stop { delete shift->{running} }

# "Bart, how did you get a cellphone?
# The same way you got me, by accident on a golf course."
sub timer { shift->_timer(pop, after => pop, started => time) }
Expand Down Expand Up @@ -160,8 +168,9 @@ Mojo::IOWatcher - Non-blocking I/O watcher
say "Timeout!";
});
# And loop!
$watcher->one_tick('0.25') while 1;
# Start and stop watcher
$watcher->start;
$watcher->stop;
=head1 DESCRIPTION
Expand Down Expand Up @@ -222,12 +231,6 @@ sockets.
Only watch handle for readable events.
=head2 C<one_tick>
$watcher->one_tick('0.25');
Run for exactly one tick and watch for I/O and timer events.
=head2 C<recurring>
my $id = $watcher->recurring(3 => sub {...});
Expand All @@ -241,6 +244,18 @@ amount of seconds.
Remove handle.
=head2 C<start>
$watcher->start;
Start watching for I/O and timer events.
=head2 C<stop>
$watcher->stop;
Stop watching for I/O and timer events.
=head2 C<timer>
my $id = $watcher->timer(3 => sub {...});
Expand Down
34 changes: 19 additions & 15 deletions lib/Mojo/IOWatcher/EV.pm
@@ -1,7 +1,7 @@
package Mojo::IOWatcher::EV;
use Mojo::Base 'Mojo::IOWatcher';

use EV;
use EV 4.0;
use Scalar::Util 'weaken';

my $EV;
Expand All @@ -25,14 +25,6 @@ sub not_writing {
return $self;
}

# "Wow, Barney. You brought a whole beer keg.
# Yeah... where do I fill it up?"
sub one_tick {
my ($self, $timeout) = @_;
my $w = EV::timer($timeout, 0, sub { EV::unloop(EV::BREAK_ONE) });
EV::loop;
}

sub recurring { shift->_timer(shift, 1, @_) }

sub remove {
Expand All @@ -41,6 +33,12 @@ sub remove {
return $self;
}

# "Wow, Barney. You brought a whole beer keg.
# Yeah... where do I fill it up?"
sub start {EV::run}

sub stop { EV::break(EV::BREAK_ONE) }

sub timer { shift->_timer(shift, 0, @_) }

sub writing {
Expand Down Expand Up @@ -124,12 +122,6 @@ Construct a new L<Mojo::IOWatcher::EV> object.
Only watch handle for readable events.
=head2 C<one_tick>
$watcher->one_tick('0.25');
Run for exactly one tick and watch for I/O and timer events.
=head2 C<recurring>
my $id = $watcher->recurring(3 => sub {...});
Expand All @@ -143,6 +135,18 @@ amount of seconds.
Remove handle.
=head2 C<start>
$watcher->start;
Start watching for I/O and timer events.
=head2 C<stop>
$watcher->stop;
Stop watching for I/O and timer events.
=head2 C<timer>
my $id = $watcher->timer(3 => sub {...});
Expand Down
1 change: 0 additions & 1 deletion lib/Mojo/UserAgent.pm
Expand Up @@ -129,7 +129,6 @@ sub start {
# Start loop
my $loop = $self->ioloop;
$loop->start;
$loop->one_tick(0);

return $tx;
}
Expand Down
41 changes: 23 additions & 18 deletions t/mojo/ioloop_tls.t
Expand Up @@ -34,7 +34,7 @@ plan skip_all => 'set TEST_TLS to enable this test (developer only!)'
unless $ENV{TEST_TLS};
plan skip_all => 'IO::Socket::SSL 1.37 required for this test!'
unless Mojo::IOLoop::Server::TLS;
plan tests => 16;
plan tests => 20;

use Mojo::IOLoop;

Expand All @@ -50,14 +50,12 @@ $loop->listen(
on_accept => sub {
shift->write(shift, 'test', sub { shift->write(shift, '321') });
},
on_close => sub { $server .= 'close' },
on_read => sub { $server .= pop }
on_read => sub { $server .= pop }
);
my $id = $loop->connect(
address => 'localhost',
port => $port,
tls => 1,
on_close => sub { $client .= 'close' },
on_connect => sub {
shift->write(shift, 'tset', sub { shift->write(shift, '123') });
},
Expand All @@ -72,6 +70,7 @@ is $client, 'test321', 'right content';
$loop = Mojo::IOLoop->singleton;
$port = Mojo::IOLoop->generate_port;
$server = $client = '';
my $server_close = my $client_close = 0;
my ($drop, $running);
Mojo::IOLoop->drop(Mojo::IOLoop->recurring(0 => sub { $drop++ }));
my $error = '';
Expand All @@ -85,17 +84,17 @@ $loop->listen(
shift->write(shift, 'test', sub { shift->write(shift, '321') });
$running = Mojo::IOLoop->is_running;
},
on_close => sub { $server .= 'close' },
on_close => sub { $server_close++ },
on_error => sub { $error = pop },
on_read => sub { $server .= pop }
on_read => sub { $server .= pop }
);
$id = $loop->connect(
address => 'localhost',
port => $port,
tls => 1,
tls_cert => 't/mojo/certs/client.crt',
tls_key => 't/mojo/certs/client.key',
on_close => sub { shift->stop },
on_close => sub { $client_close++ },
on_connect => sub {
shift->write(shift, 'tset', sub { shift->write(shift, '123') });
},
Expand All @@ -104,9 +103,11 @@ $id = $loop->connect(
$loop->connection_timeout($id => '0.5');
$loop->timer(1 => sub { shift->stop });
$loop->start;
is $server, 'tset123close', 'right content';
is $client, 'test321', 'right content';
ok $running, 'loop was running';
is $server, 'tset123', 'right content';
is $client, 'test321', 'right content';
ok $server_close, 'close event has been emitted';
ok $client_close, 'close event has been emitted';
ok $running, 'loop was running';
ok !$drop, 'event dropped successfully';
ok !$error, 'no error';

Expand Down Expand Up @@ -153,9 +154,10 @@ ok !$error, 'no error';
ok $cerror, 'has error';

# Valid client certificate accepted by callback
$loop = Mojo::IOLoop->new;
$port = Mojo::IOLoop->generate_port;
$server = $client = '';
$loop = Mojo::IOLoop->new;
$port = Mojo::IOLoop->generate_port;
$server = $client = '';
$server_close = $client_close = 0;
$loop->listen(
port => $port,
tls => 1,
Expand All @@ -166,17 +168,17 @@ $loop->listen(
on_accept => sub {
shift->write(shift, 'test', sub { shift->write(shift, '321') });
},
on_close => sub { $server .= 'close' },
on_close => sub { $server_close++ },
on_error => sub { $error = pop },
on_read => sub { $server .= pop }
on_read => sub { $server .= pop }
);
$id = $loop->connect(
address => 'localhost',
port => $port,
tls => 1,
tls_cert => 't/mojo/certs/client.crt',
tls_key => 't/mojo/certs/client.key',
on_close => sub { shift->stop },
on_close => sub { $client_close++ },
on_connect => sub {
shift->write(shift, 'tset', sub { shift->write(shift, '123') });
},
Expand All @@ -185,8 +187,10 @@ $id = $loop->connect(
$loop->connection_timeout($id => '0.5');
$loop->timer(1 => sub { shift->stop });
$loop->start;
is $server, 'tset123close', 'right content';
is $client, 'test321', 'right content';
is $server, 'tset123', 'right content';
is $client, 'test321', 'right content';
ok $server_close, 'close event has been emitted';
ok $client_close, 'close event has been emitted';

# Missing client certificate
$error = $cerror = '';
Expand All @@ -197,6 +201,7 @@ $id = $loop->connect(
on_error => sub { $cerror = pop }
);
$loop->connection_timeout($id => '0.5');
$loop->timer(1 => sub { shift->stop });
$loop->start;
ok !$error, 'no error';
ok $cerror, 'has error';
Expand Down

0 comments on commit c3d472d

Please sign in to comment.