Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
improve Mojo::Reactor::EV and Mojo::Reactor::Poll to fail more consis…
…tently (closes #767)
  • Loading branch information
kraih committed Mar 20, 2015
1 parent 16baba0 commit c19f663
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Changes
@@ -1,5 +1,7 @@

6.04 2015-03-20
- Improved Mojo::Reactor::EV and Mojo::Reactor::Poll to fail more
consistently.
- Fixed a few bugs in Mojo::DOM::CSS that required class, id and attribute
selectors, as well as pseudo-classes, to be in a specific order.

Expand Down
3 changes: 2 additions & 1 deletion lib/Mojo/Reactor.pm
Expand Up @@ -85,7 +85,8 @@ the following new ones.
$reactor->again($id);
Restart active timer. Meant to be overloaded in a subclass.
Restart timer. Meant to be overloaded in a subclass. Note that this method
requires an active timer.
=head2 detect
Expand Down
18 changes: 12 additions & 6 deletions lib/Mojo/Reactor/EV.pm
@@ -1,6 +1,7 @@
package Mojo::Reactor::EV;
use Mojo::Base 'Mojo::Reactor::Poll';

use Carp 'croak';
use EV 4.0;

my $EV;
Expand All @@ -9,7 +10,10 @@ sub CLONE { die "EV does not work with ithreads.\n" }

sub DESTROY { undef $EV }

sub again { shift->{timers}{shift()}{watcher}->again }
sub again {
croak 'Timer not active' unless my $timer = shift->{timers}{shift()};
$timer->{watcher}->again;
}

sub is_running { !!EV::depth }

Expand All @@ -29,21 +33,23 @@ sub timer { shift->_timer(0, @_) }
sub watch {
my ($self, $handle, $read, $write) = @_;

my $fd = fileno $handle;
croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};

my $mode = 0;
$mode |= EV::READ if $read;
$mode |= EV::WRITE if $write;

my $fd = fileno $handle;
if ($mode == 0) { delete $self->{io}{$fd}{watcher} }
elsif (my $w = $self->{io}{$fd}{watcher}) { $w->events($mode) }
if ($mode == 0) { delete $io->{watcher} }
elsif (my $w = $io->{watcher}) { $w->events($mode) }
else {
my $cb = sub {
my ($w, $revents) = @_;
$self->_sandbox('Read', $self->{io}{$fd}{cb}, 0) if EV::READ & $revents;
$self->_sandbox('Write', $self->{io}{$fd}{cb}, 1)
if EV::WRITE & $revents && $self->{io}{$fd};
};
$self->{io}{$fd}{watcher} = EV::io($fd, $mode, $cb);
$io->{watcher} = EV::io($fd, $mode, $cb);
}

return $self;
Expand Down Expand Up @@ -121,7 +127,7 @@ implements the following new ones.
$reactor->again($id);
Restart active timer.
Restart timer. Note that this method requires an active timer.
=head2 is_running
Expand Down
13 changes: 7 additions & 6 deletions lib/Mojo/Reactor/Poll.pm
@@ -1,13 +1,14 @@
package Mojo::Reactor::Poll;
use Mojo::Base 'Mojo::Reactor';

use Carp 'croak';
use IO::Poll qw(POLLERR POLLHUP POLLIN POLLNVAL POLLOUT POLLPRI);
use List::Util 'min';
use Mojo::Util qw(md5_sum steady_time);
use Time::HiRes 'usleep';

sub again {
my $timer = shift->{timers}{shift()};
croak 'Timer not active' unless my $timer = shift->{timers}{shift()};
$timer->{time} = steady_time + $timer->{after};
}

Expand Down Expand Up @@ -98,10 +99,10 @@ sub timer { shift->_timer(0, @_) }
sub watch {
my ($self, $handle, $read, $write) = @_;

my $mode = 0;
$mode |= POLLIN | POLLPRI if $read;
$mode |= POLLOUT if $write;
$self->{io}{fileno $handle}{mode} = $mode;
croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
$io->{mode} = 0;
$io->{mode} |= POLLIN | POLLPRI if $read;
$io->{mode} |= POLLOUT if $write;

return $self;
}
Expand Down Expand Up @@ -186,7 +187,7 @@ implements the following new ones.
$reactor->again($id);
Restart active timer.
Restart timer. Note that this method requires an active timer.
=head2 io
Expand Down
21 changes: 17 additions & 4 deletions t/mojo/reactor_ev.t
Expand Up @@ -49,7 +49,8 @@ ok !$writable, 'handle is not writable';

# Accept
my $server = $listen->accept;
$reactor->remove($listen);
ok $reactor->remove($listen), 'removed';
ok !$reactor->remove($listen), 'not removed again';
($readable, $writable) = ();
$reactor->io($client => sub { pop() ? $writable++ : $readable++ });
$reactor->again($reactor->timer(0.025 => sub { shift->stop }));
Expand All @@ -58,7 +59,7 @@ ok !$readable, 'handle is not readable';
ok $writable, 'handle is writable';
print $client "hello!\n";
sleep 1;
$reactor->remove($client);
ok $reactor->remove($client), 'removed';
($readable, $writable) = ();
$reactor->io($server => sub { pop() ? $writable++ : $readable++ });
$reactor->watch($server, 1, 0);
Expand Down Expand Up @@ -94,7 +95,7 @@ ok $writable, 'handle is writable';
# Timers
my ($timer, $recurring);
$reactor->timer(0 => sub { $timer++ });
$reactor->remove($reactor->timer(0 => sub { $timer++ }));
ok $reactor->remove($reactor->timer(0 => sub { $timer++ })), 'removed';
my $id = $reactor->recurring(0 => sub { $recurring++ });
($readable, $writable) = ();
$reactor->timer(0.025 => sub { shift->stop });
Expand All @@ -118,7 +119,8 @@ ok $readable, 'handle is readable again';
ok $writable, 'handle is writable again';
ok !$timer, 'timer was not triggered';
ok $recurring, 'recurring was triggered again';
$reactor->remove($id);
ok $reactor->remove($id), 'removed';
ok !$reactor->remove($id), 'not removed again';
($readable, $writable, $timer, $recurring) = ();
$reactor->timer(0.025 => sub { shift->stop });
$reactor->start;
Expand Down Expand Up @@ -200,6 +202,17 @@ is $pair, 2, 'timer pair was triggered';
ok $single, 'single timer was triggered';
ok $last, 'timers were triggered in the right order';

# Restart inactive timer
$id = $reactor->timer(0 => sub { });
ok $reactor->remove($id), 'removed';
eval { $reactor->again($id) };
like $@, qr/Timer not active/, 'right error';

# Change inactive I/O watcher
ok !$reactor->remove($listen), 'not removed again';
eval { $reactor->watch($listen, 1, 1) };
like $@, qr!I/O watcher not active!, 'right error';

# Error
my $err;
$reactor->unsubscribe('error')->on(
Expand Down
21 changes: 17 additions & 4 deletions t/mojo/reactor_poll.t
Expand Up @@ -46,7 +46,8 @@ ok !$writable, 'handle is not writable';

# Accept
my $server = $listen->accept;
$reactor->remove($listen);
ok $reactor->remove($listen), 'removed';
ok !$reactor->remove($listen), 'not removed again';
($readable, $writable) = ();
$reactor->io($client => sub { pop() ? $writable++ : $readable++ });
$reactor->again($reactor->timer(0.025 => sub { shift->stop }));
Expand All @@ -55,7 +56,7 @@ ok !$readable, 'handle is not readable';
ok $writable, 'handle is writable';
print $client "hello!\n";
sleep 1;
$reactor->remove($client);
ok $reactor->remove($client), 'removed';
($readable, $writable) = ();
$reactor->io($server => sub { pop() ? $writable++ : $readable++ });
$reactor->watch($server, 1, 0);
Expand Down Expand Up @@ -91,7 +92,7 @@ ok $writable, 'handle is writable';
# Timers
my ($timer, $recurring);
$reactor->timer(0 => sub { $timer++ });
$reactor->remove($reactor->timer(0 => sub { $timer++ }));
ok $reactor->remove($reactor->timer(0 => sub { $timer++ })), 'removed';
my $id = $reactor->recurring(0 => sub { $recurring++ });
($readable, $writable) = ();
$reactor->timer(0.025 => sub { shift->stop });
Expand All @@ -115,7 +116,8 @@ ok $readable, 'handle is readable again';
ok $writable, 'handle is writable again';
ok !$timer, 'timer was not triggered';
ok $recurring, 'recurring was triggered again';
$reactor->remove($id);
ok $reactor->remove($id), 'removed';
ok !$reactor->remove($id), 'not removed again';
($readable, $writable, $timer, $recurring) = ();
$reactor->timer(0.025 => sub { shift->stop });
$reactor->start;
Expand Down Expand Up @@ -197,6 +199,17 @@ is $pair, 2, 'timer pair was triggered';
ok $single, 'single timer was triggered';
ok $last, 'timers were triggered in the right order';

# Restart inactive timer
$id = $reactor->timer(0 => sub { });
ok $reactor->remove($id), 'removed';
eval { $reactor->again($id) };
like $@, qr/Timer not active/, 'right error';

# Change inactive I/O watcher
ok !$reactor->remove($listen), 'not removed again';
eval { $reactor->watch($listen, 1, 1) };
like $@, qr!I/O watcher not active!, 'right error';

# Error
my $err;
$reactor->unsubscribe('error')->on(
Expand Down

0 comments on commit c19f663

Please sign in to comment.