Skip to content

Commit

Permalink
added Mojo::Reactor::Poll
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Mar 22, 2012
1 parent 37a73eb commit 993e543
Show file tree
Hide file tree
Showing 52 changed files with 369 additions and 217 deletions.
1 change: 1 addition & 0 deletions Changes
Expand Up @@ -3,6 +3,7 @@ This file documents the revision history for Perl extension Mojolicious.
2.65 2012-03-22 00:00:00
- Deprecated Mojo::IOLoop->drop in favor of Mojo::IOLoop->remove.
- Renamed Mojo::Reactor->drop to Mojo::Reactor->remove.
- Added Mojo::Reactor::Poll.
- Added one_tick method to Mojo::Reactor and Mojo::Reactor::EV.
- Removed experimental status from Mojo::Reactor.
- Removed experimental status from Mojo::Reactor::EV.
Expand Down
8 changes: 4 additions & 4 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -6,7 +6,7 @@ use Mojo::IOLoop::Client;
use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor;
use Mojo::Reactor::Poll;
use Mojo::Util 'md5_sum';
use Scalar::Util qw/blessed weaken/;
use Time::HiRes 'time';
Expand All @@ -18,7 +18,7 @@ has [qw/lock unlock/];
has max_accepts => 0;
has max_connections => 1000;
has reactor => sub {
my $class = Mojo::Reactor->detect;
my $class = Mojo::Reactor::Poll->detect;
warn "MAINLOOP ($class)\n" if DEBUG;
return $class->new;
};
Expand Down Expand Up @@ -406,8 +406,8 @@ connections.
my $reactor = $loop->reactor;
$loop = $loop->reactor(Mojo::Reactor->new);
Low level event reactor, usually a L<Mojo::Reactor> or L<Mojo::Reactor::EV>
object.
Low level event reactor, usually a L<Mojo::Reactor::Poll> or
L<Mojo::Reactor::EV> object.
=head2 C<server_class>
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/IOLoop/Client.pm
Expand Up @@ -187,7 +187,7 @@ L<Mojo::IOLoop::Client> implements the following attributes.
=head2 C<reactor>
my $reactor = $client->reactor;
$client = $client->reactor(Mojo::Reactor->new);
$client = $client->reactor(Mojo::Reactor::Poll->new);
Low level event reactor, defaults to the C<reactor> attribute value of the
global L<Mojo::IOLoop> singleton.
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/IOLoop/Server.pm
Expand Up @@ -276,7 +276,7 @@ Number of connections to accept at once, defaults to C<10>.
=head2 C<reactor>
my $reactor = $server->reactor;
$server = $server->reactor(Mojo::Reactor->new);
$server = $server->reactor(Mojo::Reactor::Poll->new);
Low level event reactor, defaults to the C<reactor> attribute value of the
global L<Mojo::IOLoop> singleton.
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -266,7 +266,7 @@ L<Mojo::IOLoop::Stream> implements the following attributes.
=head2 C<reactor>
my $reactor = $stream->reactor;
$stream = $stream->reactor(Mojo::Reactor->new);
$stream = $stream->reactor(Mojo::Reactor::Poll->new);
Low level event reactor, defaults to the C<reactor> attribute value of the
global L<Mojo::IOLoop> singleton.
Expand Down
163 changes: 21 additions & 142 deletions lib/Mojo/Reactor.pm
@@ -1,28 +1,19 @@
package Mojo::Reactor;
use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use IO::Poll qw/POLLERR POLLHUP POLLIN POLLOUT/;
use Mojo::Loader;
use Mojo::Util 'md5_sum';
use Time::HiRes qw/time usleep/;

# "I don't know.
# Can I really betray my country?
# I say the Pledge of Allegiance every day.
# You pledge allegiance to the flag.
# And the flag is made in China."

sub detect {
my $try = $ENV{MOJO_REACTOR} || 'Mojo::Reactor::EV';
return $try unless Mojo::Loader->load($try);
return 'Mojo::Reactor';
return 'Mojo::Reactor::Poll';
}

sub io {
my ($self, $handle, $cb) = @_;
$self->{io}->{fileno $handle} = {cb => $cb};
return $self->watch($handle, 1, 1);
}
sub io { croak 'Method "io" not implemented by subclass' }

# "This was such a pleasant St. Patrick's Day until Irish people showed up."
sub is_readable {
my ($self, $handle) = @_;

Expand All @@ -35,142 +26,28 @@ sub is_readable {
return !!$result;
}

sub is_running { shift->{running} }

# "This was such a pleasant St. Patrick's Day until Irish people showed up."
sub one_tick {
my $self = shift;

# Remember state
my $running = $self->{running};
$self->{running} = 1;

# I/O
my $poll = $self->_poll;
$poll->poll(0.025);
$self->_sandbox('Read', $self->{io}->{fileno $_}->{cb}, 0)
for $poll->handles(POLLIN | POLLHUP | POLLERR);
$self->_sandbox('Write', $self->{io}->{fileno $_}->{cb}, 1)
for $poll->handles(POLLOUT);

# Wait for timeout
usleep 25000 unless keys %{$self->{io}};

# Timers
while (my ($id, $t) = each %{$self->{timers} || {}}) {
my $after = $t->{after} || 0;
if ($after <= time - ($t->{started} || $t->{recurring} || 0)) {

# Normal timer
if ($t->{started}) { $self->remove($id) }

# Recurring timer
elsif ($after && $t->{recurring}) { $t->{recurring} += $after }

# Handle timer
if (my $cb = $t->{cb}) { $self->_sandbox("Timer $id", $cb) }
}
}

# Restore state if necessary
$self->{running} = $running if $self->{running};
}

sub recurring { shift->_timer(pop, after => pop, recurring => time) }

sub remove {
my ($self, $remove) = @_;
return delete shift->{timers}->{shift()} unless ref $remove;
$self->_poll->remove($remove);
return delete $self->{io}->{fileno $remove};
}

sub start {
my $self = shift;
return if $self->{running}++;
while ($self->{running}) {
$self->one_tick;
$self->stop unless keys(%{$self->{timers}}) || keys(%{$self->{io}});
}
}

sub stop { delete shift->{running} }

# "Bart, how did you get a cellphone?
# The same way you got me, by accident on a golf course."
sub timer { shift->_timer(pop, after => pop, started => time) }

sub watch {
my ($self, $handle, $read, $write) = @_;

my $poll = $self->_poll;
$poll->remove($handle);
if ($read && $write) { $poll->mask($handle, POLLIN | POLLOUT) }
elsif ($read) { $poll->mask($handle, POLLIN) }
elsif ($write) { $poll->mask($handle, POLLOUT) }

return $self;
}

sub _poll { shift->{poll} ||= IO::Poll->new }

sub _sandbox {
my ($self, $desc, $cb) = (shift, shift, shift);
return if eval { $self->$cb(@_); 1 };
$self->once(error => sub { warn $_[1] })
unless $self->has_subscribers('error');
$self->emit_safe(error => "$desc failed: $@");
}

sub _timer {
my ($self, $cb) = (shift, shift);

my $t = {cb => $cb, @_};
my $id;
do { $id = md5_sum('t' . time . rand 999) } while $self->{timers}->{$id};
$self->{timers}->{$id} = $t;

return $id;
}
sub is_running { croak 'Method "is_running" not implemented by subclass' }
sub one_tick { croak 'Method "one_tick" not implemented by subclass' }
sub recurring { croak 'Method "recurring" not implemented by subclass' }
sub remove { croak 'Method "remove" not implemented by subclass' }
sub start { croak 'Method "start" not implemented by subclass' }
sub stop { croak 'Method "stop" not implemented by subclass' }
sub timer { croak 'Method "timer" not implemented by subclass' }
sub watch { croak 'Method "watch" not implemented by subclass' }

1;
__END__
=head1 NAME
Mojo::Reactor - Minimalistic low level event reactor
Mojo::Reactor - Low level event reactor base class
=head1 SYNOPSIS
use Mojo::Reactor;
# Watch if handle becomes readable or writable
my $reactor = Mojo::Reactor->new;
$reactor->io($handle => sub {
my ($reactor, $writable) = @_;
say $writable ? 'Handle is writable' : 'Handle is readable';
});
# Add a timer
$reactor->timer(15 => sub {
my $reactor = shift;
$reactor->remove($handle);
say 'Timeout!';
});
# Start reactor if necessary
$reactor->start unless $reactor->is_running;
=head1 DESCRIPTION
L<Mojo::Reactor> is a minimalistic low level event reactor based on
L<IO::Poll> and the foundation of L<Mojo::IOLoop>.
# A new reactor implementation could look like this
package Mojo::Reactor::MyLoop;
package Mojo::Reactor::MyEventLoop;
use Mojo::Base 'Mojo::Reactor';
$ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::MyLoop';
$ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::MyEventLoop';
sub io {...}
sub is_running {...}
Expand All @@ -184,8 +61,9 @@ L<IO::Poll> and the foundation of L<Mojo::IOLoop>.
1;
Exceptions in callbacks should be caught and emitted safely as C<error>
events with L<Mojo::EventEmitter/"emit_safe">.
=head1 DESCRIPTION
L<Mojo::Reactor> is an abstract base class for low level event reactors.
=head1 EVENTS
Expand Down Expand Up @@ -215,7 +93,8 @@ implements the following new ones.
my $class = Mojo::Reactor->detect;
Detect and load the best reactor implementation available, will try the value
of the C<MOJO_REACTOR> environment variable or L<Mojo::Reactor::EV>.
of the C<MOJO_REACTOR> environment variable, L<Mojo::Reactor::EV> or
L<Mojo::Reactor::Poll>.
=head2 C<io>
Expand Down
45 changes: 37 additions & 8 deletions lib/Mojo/Reactor/EV.pm
@@ -1,5 +1,5 @@
package Mojo::Reactor::EV;
use Mojo::Base 'Mojo::Reactor';
use Mojo::Base 'Mojo::Reactor::Poll';

use EV 4.0;
use Scalar::Util 'weaken';
Expand All @@ -8,8 +8,8 @@ my $EV;

sub DESTROY { undef $EV }

# We have to fall back to Mojo::Reactor, since EV is unique
sub new { $EV++ ? Mojo::Reactor->new : shift->SUPER::new }
# We have to fall back to Mojo::Reactor::Poll, since EV is unique
sub new { $EV++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }

sub is_running {EV::depth}

Expand Down Expand Up @@ -77,26 +77,40 @@ __END__
=head1 NAME
Mojo::Reactor::EV - Minimalistic low level event reactor with libev support
Mojo::Reactor::EV - Low level event reactor with libev support
=head1 SYNOPSIS
use Mojo::Reactor::EV;
# Watch if handle becomes readable or writable
my $reactor = Mojo::Reactor::EV->new;
$reactor->io($handle => sub {
my ($reactor, $writable) = @_;
say $writable ? 'Handle is writable' : 'Handle is readable';
});
# Add a timer
$reactor->timer(15 => sub {
my $reactor = shift;
$reactor->remove($handle);
say 'Timeout!';
});
# Start reactor if necessary
$reactor->start unless $reactor->is_running;
=head1 DESCRIPTION
L<Mojo::Reactor::EV> is a minimalistic low level event reactor with C<libev>
support.
L<Mojo::Reactor::EV> is a low level event reactor based on C<libev>.
=head1 EVENTS
L<Mojo::Reactor::EV> inherits all events from L<Mojo::Reactor>.
L<Mojo::Reactor::EV> inherits all events from L<Mojo::Reactor::Poll>.
=head1 METHODS
L<Mojo::Reactor::EV> inherits all methods from L<Mojo::Reactor> and
L<Mojo::Reactor::EV> inherits all methods from L<Mojo::Reactor::Poll> and
implements the following new ones.
=head2 C<new>
Expand Down Expand Up @@ -125,6 +139,9 @@ the reactor, so you need to be careful.
Create a new recurring timer, invoking the callback repeatedly after a given
amount of time in seconds.
# Invoke as soon as possible
$reactor->recurring(0 => sub { say 'Reactor tick.' });
=head2 C<start>
$reactor->start;
Expand All @@ -151,6 +168,18 @@ seconds.
Change I/O events to watch handle for with C<true> and C<false> values.
# Watch only for readable events
$reactor->watch($handle, 1, 0);
# Watch only for writable events
$reactor->watch($handle, 0, 1);
# Watch for readable and writable events
$reactor->watch($handle, 1, 1);
# Pause watching for events
$reactor->watch($handle, 0, 0);
=head1 SEE ALSO
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
Expand Down

0 comments on commit 993e543

Please sign in to comment.