Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
added again methods to the reactors
  • Loading branch information
kraih committed May 9, 2013
1 parent fa39fde commit fe23a7e
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 32 deletions.
65 changes: 35 additions & 30 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -3,13 +3,11 @@ use Mojo::Base 'Mojo::EventEmitter';

use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK);
use Scalar::Util 'weaken';
use Mojo::Util 'steady_time';

has reactor => sub {
require Mojo::IOLoop;
Mojo::IOLoop->singleton->reactor;
};
has timeout => 15;

sub DESTROY { shift->close }

Expand Down Expand Up @@ -38,7 +36,7 @@ sub handle { shift->{handle} }

sub is_readable {
my $self = shift;
$self->{active} = steady_time;
$self->_again;
return $self->{handle} && $self->reactor->is_readable($self->{handle});
}

Expand All @@ -52,22 +50,9 @@ sub start {
my $self = shift;

my $reactor = $self->reactor;
unless ($self->{timer}) {

# Timeout (ignore 0 timeout)
weaken $self;
$self->{timer} = $reactor->recurring(
1 => sub {
return unless my $timeout = $self->timeout;
my $diff = steady_time - $self->{active};
$self->emit_safe('timeout')->close if $diff >= $timeout;
}
);

$self->{active} = steady_time;
$reactor->io($self->{handle},
sub { pop() ? $self->_write : $self->_read });
}
$reactor->io($self->timeout(15)->{handle},
sub { pop() ? $self->_write : $self->_read })
unless $self->{timer};

# Resume
$reactor->watch($self->{handle}, 1, $self->is_writing)
Expand All @@ -86,6 +71,21 @@ sub steal_handle {
return delete $self->{handle};
}

sub timeout {
my $self = shift;

return $self->{timeout} unless @_;
my $reactor = $self->reactor;
$reactor->remove(delete $self->{timer}) if $self->{timer};
return $self unless my $timeout = $self->{timeout} = shift;

weaken $self;
$self->{timer} = $reactor->recurring(
$timeout => sub { $self->emit_safe('timeout')->close });

return $self;
}

sub write {
my ($self, $chunk, $cb) = @_;

Expand All @@ -98,6 +98,11 @@ sub write {
return $self;
}

sub _again {
my $self = shift;
$self->reactor->again($self->{timer}) if $self->{timer};
}

sub _error {
my $self = shift;

Expand All @@ -116,7 +121,7 @@ sub _read {
my $read = $self->{handle}->sysread(my $buffer, 131072, 0);
return $self->_error unless defined $read;
return $self->close if $read == 0;
$self->emit_safe(read => $buffer)->{active} = steady_time;
$self->emit_safe(read => $buffer)->_again;
}

sub _write {
Expand All @@ -127,7 +132,7 @@ sub _write {
my $written = $handle->syswrite($self->{buffer});
return $self->_error unless defined $written;
$self->emit_safe(write => substr($self->{buffer}, 0, $written, ''));
$self->{active} = steady_time;
$self->_again;
}

$self->emit_safe('drain') unless length $self->{buffer};
Expand Down Expand Up @@ -245,15 +250,6 @@ L<Mojo::IOLoop::Stream> implements the following attributes.
Low level event reactor, defaults to the C<reactor> attribute value of the
global L<Mojo::IOLoop> singleton.
=head2 timeout
my $timeout = $stream->timeout;
$stream = $stream->timeout(45);
Maximum amount of time in seconds stream can be inactive before getting closed
automatically, defaults to C<15>. Setting the value to C<0> will allow this
stream to be inactive indefinitely.
=head1 METHODS
L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and
Expand Down Expand Up @@ -314,6 +310,15 @@ Stop watching for new data on the stream.
Steal handle from stream and prevent it from getting closed automatically.
=head2 timeout
my $timeout = $stream->timeout;
$stream = $stream->timeout(45);
Maximum amount of time in seconds stream can be inactive before getting closed
automatically, defaults to C<15>. Setting the value to C<0> will allow this
stream to be inactive indefinitely.
=head2 write
$stream = $stream->write($bytes);
Expand Down
9 changes: 9 additions & 0 deletions lib/Mojo/Reactor.pm
Expand Up @@ -5,6 +5,8 @@ use Carp 'croak';
use IO::Poll qw(POLLERR POLLHUP POLLIN);
use Mojo::Loader;

sub again { croak 'Method "again" not implemented by subclass' }

sub detect {
my $try = $ENV{MOJO_REACTOR} || 'Mojo::Reactor::EV';
return Mojo::Loader->new->load($try) ? 'Mojo::Reactor::Poll' : $try;
Expand Down Expand Up @@ -46,6 +48,7 @@ Mojo::Reactor - Low level event reactor base class
$ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::MyEventLoop';
sub again {...}
sub io {...}
sub is_running {...}
sub one_tick {...}
Expand Down Expand Up @@ -84,6 +87,12 @@ Emitted safely for exceptions caught in callbacks.
L<Mojo::Reactor> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
=head2 again
$reactor->again($id);
Restart timer.
=head2 detect
my $class = Mojo::Reactor->detect;
Expand Down
10 changes: 9 additions & 1 deletion lib/Mojo/Reactor/EV.pm
Expand Up @@ -13,6 +13,8 @@ sub DESTROY { undef $EV }
# We have to fall back to Mojo::Reactor::Poll, since EV is unique
sub new { $EV++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }

sub again { shift->{timers}{shift()}{watcher}->again }

sub is_running { !!EV::depth }

sub one_tick { EV::run(EV::RUN_ONCE) }
Expand Down Expand Up @@ -59,7 +61,7 @@ sub _timer {
my $id = $self->SUPER::_timer(0, 0, $cb);
weaken $self;
$self->{timers}{$id}{watcher} = EV::timer(
$after => ($recurring ? $after : 0) => sub {
$after => $after => sub {
$self->_sandbox("Timer $id", $self->{timers}{$id}{cb});
delete $self->{timers}{$id} unless $recurring;
}
Expand Down Expand Up @@ -117,6 +119,12 @@ implements the following new ones.
Construct a new L<Mojo::Reactor::EV> object.
=head2 again
$reactor->again($id);
Restart timer.
=head2 is_running
my $success = $reactor->is_running;
Expand Down
14 changes: 13 additions & 1 deletion lib/Mojo/Reactor/Poll.pm
Expand Up @@ -6,6 +6,11 @@ use List::Util 'min';
use Mojo::Util qw(md5_sum steady_time);
use Time::HiRes 'usleep';

sub again {
my $timer = shift->{timers}{shift()};
$timer->{time} = steady_time + $timer->{after};
}

sub io {
my ($self, $handle, $cb) = @_;
$self->{io}{fileno $handle} = {cb => $cb};
Expand Down Expand Up @@ -109,7 +114,8 @@ sub _timer {
my $timers = $self->{timers} //= {};
my $id;
do { $id = md5_sum('t' . steady_time . rand 999) } while $timers->{$id};
my $timer = $timers->{$id} = {cb => $cb, time => steady_time + $after};
my $timer = $timers->{$id}
= {cb => $cb, after => $after, time => steady_time + $after};
$timer->{recurring} = $after if $recurring;

return $id;
Expand Down Expand Up @@ -158,6 +164,12 @@ L<Mojo::Reactor::Poll> inherits all events from L<Mojo::Reactor>.
L<Mojo::Reactor::Poll> inherits all methods from L<Mojo::Reactor> and
implements the following new ones.
=head2 again
$reactor->again($id);
Restart timer.
=head2 io
$reactor = $reactor->io($handle => sub {...});
Expand Down

0 comments on commit fe23a7e

Please sign in to comment.