Skip to content

Commit

Permalink
fixed one_tick semantics to be equal in Mojo::Reactor::Poll and Mojo:…
Browse files Browse the repository at this point in the history
…:Reactor::EV
  • Loading branch information
kraih committed Mar 23, 2012
1 parent cbc7a29 commit 5c922b9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 46 deletions.
3 changes: 3 additions & 0 deletions Changes
Expand Up @@ -2,6 +2,9 @@ This file documents the revision history for Perl extension Mojolicious.

2.66 2012-03-23
- Reformatted "Changes".
- Improved Mojo::Reactor::Poll performance.
- Fixed one_tick semantics to be equal in Mojo::Reactor::Poll and
Mojo::Reactor::EV.

2.65 2012-03-22
- Deprecated Mojo::IOLoop->drop in favor of Mojo::IOLoop->remove.
Expand Down
9 changes: 3 additions & 6 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -496,8 +496,9 @@ Check if loop is running.
Mojo::IOLoop->one_tick;
$loop->one_tick;
Run reactor for roughly one tick. Note that this method can recurse back into
the reactor, so you need to be careful.
Run reactor until at least one event has been handled or no events are being
watched anymore. Note that this method can recurse back into the reactor, so
you need to be careful.
=head2 C<recurring>
Expand All @@ -507,10 +508,6 @@ the reactor, so you need to be careful.
Create a new recurring timer, invoking the callback repeatedly after a given
amount of time in seconds.
# Run multiple reactors next to each other
my $loop2 = Mojo::IOLoop->new;
Mojo::IOLoop->recurring(0 => sub { $loop2->one_tick });
=head2 C<remove>
Mojo::IOLoop->remove($id);
Expand Down
8 changes: 6 additions & 2 deletions lib/Mojo/Reactor.pm
Expand Up @@ -95,6 +95,9 @@ Detect and load the best reactor implementation available, will try the value
of the C<MOJO_REACTOR> environment variable, L<Mojo::Reactor::EV> or
L<Mojo::Reactor::Poll>.
# Instantiate best reactor implementation available
my $reactor = Mojo::Reactor->detect->new;
=head2 C<io>
$reactor = $reactor->io($handle => sub {...});
Expand Down Expand Up @@ -125,8 +128,9 @@ Check if reactor is running.
$reactor->one_tick;
Run reactor for roughly one tick. Note that this method can recurse back into
the reactor, so you need to be careful.
Run reactor until at least one event has been handled or no events are being
watched anymore. Note that this method can recurse back into the reactor, so
you need to be careful.
=head2 C<recurring>
Expand Down
15 changes: 8 additions & 7 deletions lib/Mojo/Reactor/EV.pm
Expand Up @@ -13,17 +13,17 @@ sub new { $EV++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }

sub is_running {EV::depth}

sub one_tick { EV::run(EV::RUN_NOWAIT) }
sub one_tick { EV::run(EV::RUN_ONCE) }

sub recurring { shift->_timer(shift, 1, @_) }
sub recurring { shift->_timer(1, @_) }

# "Wow, Barney. You brought a whole beer keg.
# Yeah... where do I fill it up?"
sub start {EV::run}

sub stop { EV::break(EV::BREAK_ALL) }

sub timer { shift->_timer(shift, 0, @_) }
sub timer { shift->_timer(0, @_) }

sub watch {
my ($self, $handle, $read, $write) = @_;
Expand Down Expand Up @@ -54,10 +54,10 @@ sub _io {

# "It's great! We can do *anything* now that Science has invented Magic."
sub _timer {
my ($self, $after, $recurring, $cb) = @_;
my ($self, $recurring, $after, $cb) = @_;
$after ||= '0.0001';

my $id = $self->SUPER::_timer($cb);
my $id = $self->SUPER::_timer(0, 0, $cb);
weaken $self;
$self->{timers}->{$id}->{watcher} = EV::timer(
$after,
Expand Down Expand Up @@ -129,8 +129,9 @@ Check if reactor is running.
$reactor->one_tick;
Run reactor for roughly one tick. Note that this method can recurse back into
the reactor, so you need to be careful.
Run reactor until at least one event has been handled or no events are being
watched anymore. Note that this method can recurse back into the reactor, so
you need to be careful.
=head2 C<recurring>
Expand Down
72 changes: 41 additions & 31 deletions lib/Mojo/Reactor/Poll.pm
Expand Up @@ -2,6 +2,7 @@ package Mojo::Reactor::Poll;
use Mojo::Base 'Mojo::Reactor';

use IO::Poll qw/POLLERR POLLHUP POLLIN POLLOUT/;
use List::Util 'min';
use Mojo::Util 'md5_sum';
use Time::HiRes qw/time usleep/;

Expand All @@ -25,38 +26,51 @@ sub one_tick {
my $running = $self->{running};
$self->{running} = 1;

# I/O
my $poll = $self->_poll;
$poll->poll(0.025);
$self->_sandbox('Read', $self->{io}->{fileno $_}->{cb}, 0)
for $poll->handles(POLLIN | POLLHUP | POLLERR);
$self->_sandbox('Write', $self->{io}->{fileno $_}->{cb}, 1)
for $poll->handles(POLLOUT);
# Wait for one event
my $i = 0;
while (!$i) {

# Stop automatically if there is nothing to watch
return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};

# Calculate ideal timeout based on timers
my $min = min map { $_->{time} } values %{$self->{timers}};
my $timeout = defined $min ? ($min - time) : 0.025;
$timeout = 0 if $timeout < 0;

# I/O
if (keys %{$self->{io}}) {
my $poll = $self->_poll;
$poll->poll($timeout);
++$i and $self->_sandbox('Read', $self->{io}->{fileno $_}->{cb}, 0)
for $poll->handles(POLLIN | POLLHUP | POLLERR);
++$i and $self->_sandbox('Write', $self->{io}->{fileno $_}->{cb}, 1)
for $poll->handles(POLLOUT);
}

# Wait for timeout
usleep 25000 unless keys %{$self->{io}};
# Wait for timeout
elsif ($timeout) { usleep $timeout * 1000000 }

# Timers
while (my ($id, $t) = each %{$self->{timers} || {}}) {
my $after = $t->{after} || 0;
if ($after <= time - ($t->{started} || $t->{recurring} || 0)) {

# Normal timer
if ($t->{started}) { $self->remove($id) }
# Timers
while (my ($id, $t) = each %{$self->{timers} || {}}) {
next unless $t->{time} <= time;

# Recurring timer
elsif ($after && $t->{recurring}) { $t->{recurring} += $after }
if (exists $t->{recurring}) { $t->{time} = time + $t->{recurring} }

# Normal timer
else { $self->remove($id) }

# Handle timer
if (my $cb = $t->{cb}) { $self->_sandbox("Timer $id", $cb) }
++$i and $self->_sandbox("Timer $id", $t->{cb}) if $t->{cb};
}
}

# Restore state if necessary
$self->{running} = $running if $self->{running};
}

sub recurring { shift->_timer(pop, after => pop, recurring => time) }
sub recurring { shift->_timer(1, @_) }

sub remove {
my ($self, $remove) = @_;
Expand All @@ -68,17 +82,14 @@ sub remove {
sub start {
my $self = shift;
return if $self->{running}++;
while ($self->{running}) {
$self->one_tick;
$self->stop unless keys(%{$self->{timers}}) || keys(%{$self->{io}});
}
$self->one_tick while $self->{running};
}

sub stop { delete shift->{running} }

# "Bart, how did you get a cellphone?
# The same way you got me, by accident on a golf course."
sub timer { shift->_timer(pop, after => pop, started => time) }
sub timer { shift->_timer(0, @_) }

sub watch {
my ($self, $handle, $read, $write) = @_;
Expand All @@ -103,13 +114,11 @@ sub _sandbox {
}

sub _timer {
my ($self, $cb) = (shift, shift);

my $t = {cb => $cb, @_};
my ($self, $recurring, $after, $cb) = @_;
my $id;
do { $id = md5_sum('t' . time . rand 999) } while $self->{timers}->{$id};
$self->{timers}->{$id} = $t;

my $t = $self->{timers}->{$id} = {cb => $cb, time => time + $after};
$t->{recurring} = $after if $recurring;
return $id;
}

Expand Down Expand Up @@ -171,8 +180,9 @@ Check if reactor is running.
$reactor->one_tick;
Run reactor for roughly one tick. Note that this method can recurse back into
the reactor, so you need to be careful.
Run reactor until at least one event has been handled or no events are being
watched anymore. Note that this method can recurse back into the reactor, so
you need to be careful.
=head2 C<recurring>
Expand Down

0 comments on commit 5c922b9

Please sign in to comment.