Skip to content

Commit

Permalink
added monotonic clock support through EV
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Feb 27, 2013
1 parent d377730 commit ae00982
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
18 changes: 10 additions & 8 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -3,7 +3,6 @@ use Mojo::Base 'Mojo::EventEmitter';

use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK);
use Scalar::Util 'weaken';
use Time::HiRes 'time';

has reactor => sub {
require Mojo::IOLoop;
Expand All @@ -13,7 +12,7 @@ has timeout => 15;

sub DESTROY { shift->close }

sub new { shift->SUPER::new(handle => shift, buffer => '', active => time) }
sub new { shift->SUPER::new(handle => shift, buffer => '') }

sub close {
my $self = shift;
Expand All @@ -31,9 +30,10 @@ sub close {
sub handle { shift->{handle} }

sub is_readable {
my $self = shift;
$self->{active} = time;
return $self->{handle} && $self->reactor->is_readable($self->{handle});
my $self = shift;
my $reactor = $self->reactor;
$self->{active} = $reactor->time;
return $self->{handle} && $reactor->is_readable($self->{handle});
}

sub is_writing {
Expand Down Expand Up @@ -92,19 +92,21 @@ sub _read {
# EOF
return $self->close if $read == 0;

$self->emit_safe(read => $buffer)->{active} = time;
$self->emit_safe(read => $buffer)->{active} = $self->reactor->time;
}

sub _startup {
my $self = shift;

# Timeout (ignore 0 timeout)
my $reactor = $self->reactor;
$self->{active} = $reactor->time;
weaken $self;
$self->{timer} = $reactor->recurring(
0.5 => sub {
return unless my $t = $self->timeout;
$self->emit_safe('timeout')->close if (time - $self->{active}) >= $t;
$self->emit_safe('timeout')->close
if ($self->reactor->time - $self->{active}) >= $t;
}
);

Expand All @@ -130,7 +132,7 @@ sub _write {
}

$self->emit_safe(write => substr($self->{buffer}, 0, $written, ''));
$self->{active} = time;
$self->{active} = $self->reactor->time;
}

$self->emit_safe('drain') if !length $self->{buffer};
Expand Down
14 changes: 12 additions & 2 deletions lib/Mojo/Reactor.pm
Expand Up @@ -4,6 +4,7 @@ use Mojo::Base 'Mojo::EventEmitter';
use Carp 'croak';
use IO::Poll qw(POLLERR POLLHUP POLLIN);
use Mojo::Loader;
use Time::HiRes ();

sub detect {
my $try = $ENV{MOJO_REACTOR} || 'Mojo::Reactor::EV';
Expand All @@ -30,8 +31,11 @@ sub recurring { croak 'Method "recurring" not implemented by subclass' }
sub remove { croak 'Method "remove" not implemented by subclass' }
sub start { croak 'Method "start" not implemented by subclass' }
sub stop { croak 'Method "stop" not implemented by subclass' }
sub timer { croak 'Method "timer" not implemented by subclass' }
sub watch { croak 'Method "watch" not implemented by subclass' }

sub time {Time::HiRes::time}

sub timer { croak 'Method "timer" not implemented by subclass' }
sub watch { croak 'Method "watch" not implemented by subclass' }

1;

Expand Down Expand Up @@ -164,6 +168,12 @@ being watched anymore. Meant to be overloaded in a subclass.
Stop watching for I/O and timer events. Meant to be overloaded in a subclass.
=head2 time
my $time = $reactor->time;
High resolution time.
=head2 timer
my $id = $reactor->timer(0.5 => sub {...});
Expand Down
8 changes: 8 additions & 0 deletions lib/Mojo/Reactor/EV.pm
Expand Up @@ -23,6 +23,8 @@ sub start {EV::run}

sub stop { EV::break(EV::BREAK_ALL) }

sub time {EV::time}

sub timer { shift->_timer(0, @_) }

sub watch {
Expand Down Expand Up @@ -150,6 +152,12 @@ called or no events are being watched anymore.
Stop watching for I/O and timer events.
=head2 time
my $time = $reactor->time;
Monotonic high resolution time.
=head2 timer
my $id = $reactor->timer(0.5 => sub {...});
Expand Down
11 changes: 6 additions & 5 deletions lib/Mojo/Reactor/Poll.pm
Expand Up @@ -4,7 +4,7 @@ use Mojo::Base 'Mojo::Reactor';
use IO::Poll qw(POLLERR POLLHUP POLLIN POLLOUT);
use List::Util 'min';
use Mojo::Util 'md5_sum';
use Time::HiRes qw(time usleep);
use Time::HiRes 'usleep';

sub io {
my ($self, $handle, $cb) = @_;
Expand All @@ -31,7 +31,7 @@ sub one_tick {

# Calculate ideal timeout based on timers
my $min = min map { $_->{time} } values %{$self->{timers}};
my $timeout = defined $min ? ($min - time) : 0.5;
my $timeout = defined $min ? ($min - $self->time) : 0.5;
$timeout = 0 if $timeout < 0;

# I/O
Expand All @@ -48,7 +48,7 @@ sub one_tick {

# Timers
while (my ($id, $t) = each %{$self->{timers} || {}}) {
next unless $t->{time} <= (my $time = time);
next unless $t->{time} <= (my $time = $self->time);

# Recurring timer
if (exists $t->{recurring}) { $t->{time} = $time + $t->{recurring} }
Expand Down Expand Up @@ -106,8 +106,9 @@ sub _timer {
my ($self, $recurring, $after, $cb) = @_;

my $id;
do { $id = md5_sum('t' . time . rand 999) } while $self->{timers}{$id};
my $t = $self->{timers}{$id} = {cb => $cb, time => time + $after};
do { $id = md5_sum('t' . $self->time . rand 999) }
while $self->{timers}{$id};
my $t = $self->{timers}{$id} = {cb => $cb, time => $self->time + $after};
$t->{recurring} = $after if $recurring;

return $id;
Expand Down
20 changes: 13 additions & 7 deletions lib/Mojo/Server/Prefork.pm
Expand Up @@ -70,7 +70,7 @@ sub run {
local $SIG{TTOU} = sub {
$self->workers($self->workers - 1) if $self->workers > 0;
return unless $self->workers;
$self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= time;
$self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= $self->_time;
};

# Preload application and start accepting connections
Expand All @@ -89,7 +89,8 @@ sub _heartbeat {
return unless $self->{reader}->sysread(my $chunk, 4194304);

# Update heartbeats
$self->{pool}{$1} and $self->emit(heartbeat => $1)->{pool}{$1}{time} = time
$self->{pool}{$1}
and $self->emit(heartbeat => $1)->{pool}{$1}{time} = $self->_time
while $chunk =~ /(\d+)\n/g;
}

Expand All @@ -113,17 +114,19 @@ sub _manage {
# No heartbeat (graceful stop)
my $interval = $self->heartbeat_interval;
my $timeout = $self->heartbeat_timeout;
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= time)) {
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= $self->_time))
{
$log->info("Worker $pid has no heartbeat, restarting.");
$w->{graceful} = time;
$w->{graceful} = $self->_time;
}

# Graceful stop with timeout
$w->{graceful} ||= time if $self->{graceful};
$w->{graceful} ||= $self->_time if $self->{graceful};
if ($w->{graceful}) {
$log->debug("Trying to stop worker $pid gracefully.");
kill 'QUIT', $pid;
$w->{force} = 1 if $w->{graceful} + $self->graceful_timeout <= time;
$w->{force} = 1
if $w->{graceful} + $self->graceful_timeout <= $self->_time;
}

# Normal stop
Expand Down Expand Up @@ -161,7 +164,8 @@ sub _spawn {

# Manager
die "Can't fork: $!" unless defined(my $pid = fork);
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => time} if $pid;
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => $self->_time}
if $pid;

# Prepare lock file
my $file = $self->{lock_file};
Expand Down Expand Up @@ -219,6 +223,8 @@ sub _term {
$self->{graceful} = 1 if $graceful;
}

sub _time { shift->ioloop->reactor->time }

1;

=head1 NAME
Expand Down

0 comments on commit ae00982

Please sign in to comment.