Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
added monotonic clock support to Mojo::IOLoop through EV
  • Loading branch information
kraih committed Feb 27, 2013
1 parent d377730 commit 457572f
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 28 deletions.
4 changes: 3 additions & 1 deletion Changes
@@ -1,5 +1,7 @@

3.88 2013-02-26
3.88 2013-02-27
- Added monotonic clock support to Mojo::IOLoop through EV.
- Added steady_time function to Mojo::Util.
- Improved documentation.
- Improved tests.
- Fixed small domain detection bug in Mojo::UserAgent::CookieJar.
Expand Down
5 changes: 2 additions & 3 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -9,9 +9,8 @@ use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util 'md5_sum';
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util 'weaken';
use Time::HiRes 'time';

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;

Expand Down Expand Up @@ -201,7 +200,7 @@ sub _accepting {
sub _id {
my $self = shift;
my $id;
do { $id = md5_sum('c' . time . rand 999) }
do { $id = md5_sum('c' . steady_time . rand 999) }
while $self->{connections}{$id} || $self->{acceptors}{$id};
return $id;
}
Expand Down
14 changes: 8 additions & 6 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -2,8 +2,8 @@ package Mojo::IOLoop::Stream;
use Mojo::Base 'Mojo::EventEmitter';

use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK);
use Mojo::Util 'steady_time';
use Scalar::Util 'weaken';
use Time::HiRes 'time';

has reactor => sub {
require Mojo::IOLoop;
Expand All @@ -13,7 +13,7 @@ has timeout => 15;

sub DESTROY { shift->close }

sub new { shift->SUPER::new(handle => shift, buffer => '', active => time) }
sub new { shift->SUPER::new(handle => shift, buffer => '') }

sub close {
my $self = shift;
Expand All @@ -32,7 +32,7 @@ sub handle { shift->{handle} }

sub is_readable {
my $self = shift;
$self->{active} = time;
$self->{active} = steady_time;
return $self->{handle} && $self->reactor->is_readable($self->{handle});
}

Expand Down Expand Up @@ -92,19 +92,21 @@ sub _read {
# EOF
return $self->close if $read == 0;

$self->emit_safe(read => $buffer)->{active} = time;
$self->emit_safe(read => $buffer)->{active} = steady_time;
}

sub _startup {
my $self = shift;

# Timeout (ignore 0 timeout)
my $reactor = $self->reactor;
$self->{active} = steady_time;
weaken $self;
$self->{timer} = $reactor->recurring(
0.5 => sub {
return unless my $t = $self->timeout;
$self->emit_safe('timeout')->close if (time - $self->{active}) >= $t;
$self->emit_safe('timeout')->close
if (steady_time() - $self->{active}) >= $t;
}
);

Expand All @@ -130,7 +132,7 @@ sub _write {
}

$self->emit_safe(write => substr($self->{buffer}, 0, $written, ''));
$self->{active} = time;
$self->{active} = steady_time;
}

$self->emit_safe('drain') if !length $self->{buffer};
Expand Down
13 changes: 7 additions & 6 deletions lib/Mojo/Reactor/Poll.pm
Expand Up @@ -3,8 +3,8 @@ use Mojo::Base 'Mojo::Reactor';

use IO::Poll qw(POLLERR POLLHUP POLLIN POLLOUT);
use List::Util 'min';
use Mojo::Util 'md5_sum';
use Time::HiRes qw(time usleep);
use Mojo::Util qw(md5_sum steady_time);
use Time::HiRes 'usleep';

sub io {
my ($self, $handle, $cb) = @_;
Expand All @@ -31,7 +31,7 @@ sub one_tick {

# Calculate ideal timeout based on timers
my $min = min map { $_->{time} } values %{$self->{timers}};
my $timeout = defined $min ? ($min - time) : 0.5;
my $timeout = defined $min ? ($min - steady_time) : 0.5;
$timeout = 0 if $timeout < 0;

# I/O
Expand All @@ -48,7 +48,7 @@ sub one_tick {

# Timers
while (my ($id, $t) = each %{$self->{timers} || {}}) {
next unless $t->{time} <= (my $time = time);
next unless $t->{time} <= (my $time = steady_time);

# Recurring timer
if (exists $t->{recurring}) { $t->{time} = $time + $t->{recurring} }
Expand Down Expand Up @@ -106,8 +106,9 @@ sub _timer {
my ($self, $recurring, $after, $cb) = @_;

my $id;
do { $id = md5_sum('t' . time . rand 999) } while $self->{timers}{$id};
my $t = $self->{timers}{$id} = {cb => $cb, time => time + $after};
do { $id = md5_sum('t' . steady_time . rand 999) }
while $self->{timers}{$id};
my $t = $self->{timers}{$id} = {cb => $cb, time => steady_time() + $after};
$t->{recurring} = $after if $recurring;

return $id;
Expand Down
5 changes: 3 additions & 2 deletions lib/Mojo/Server/Hypnotoad.pm
Expand Up @@ -7,6 +7,7 @@ use Cwd 'abs_path';
use File::Basename 'dirname';
use File::Spec::Functions 'catfile';
use Mojo::Server::Prefork;
use Mojo::Util 'steady_time';
use POSIX 'setsid';
use Scalar::Util 'weaken';

Expand Down Expand Up @@ -62,7 +63,7 @@ sub run {
}

# Start accepting connections
local $SIG{USR2} = sub { $self->{upgrade} ||= time };
local $SIG{USR2} = sub { $self->{upgrade} ||= steady_time };
$prefork->run;
}

Expand Down Expand Up @@ -123,7 +124,7 @@ sub _manage {

# Timeout
kill 'KILL', $self->{new}
if $self->{upgrade} + $self->{upgrade_timeout} <= time;
if $self->{upgrade} + $self->{upgrade_timeout} <= steady_time;
}
}

Expand Down
18 changes: 11 additions & 7 deletions lib/Mojo/Server/Prefork.pm
Expand Up @@ -5,6 +5,7 @@ use Fcntl ':flock';
use File::Spec::Functions qw(catfile tmpdir);
use IO::Poll 'POLLIN';
use List::Util 'shuffle';
use Mojo::Util 'steady_time';
use POSIX 'WNOHANG';
use Scalar::Util 'weaken';
use Time::HiRes ();
Expand Down Expand Up @@ -70,7 +71,7 @@ sub run {
local $SIG{TTOU} = sub {
$self->workers($self->workers - 1) if $self->workers > 0;
return unless $self->workers;
$self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= time;
$self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= steady_time;
};

# Preload application and start accepting connections
Expand All @@ -89,7 +90,8 @@ sub _heartbeat {
return unless $self->{reader}->sysread(my $chunk, 4194304);

# Update heartbeats
$self->{pool}{$1} and $self->emit(heartbeat => $1)->{pool}{$1}{time} = time
$self->{pool}{$1}
and $self->emit(heartbeat => $1)->{pool}{$1}{time} = steady_time
while $chunk =~ /(\d+)\n/g;
}

Expand All @@ -113,17 +115,18 @@ sub _manage {
# No heartbeat (graceful stop)
my $interval = $self->heartbeat_interval;
my $timeout = $self->heartbeat_timeout;
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= time)) {
my $time = steady_time;
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= $time)) {
$log->info("Worker $pid has no heartbeat, restarting.");
$w->{graceful} = time;
$w->{graceful} = $time;
}

# Graceful stop with timeout
$w->{graceful} ||= time if $self->{graceful};
$w->{graceful} ||= $time if $self->{graceful};
if ($w->{graceful}) {
$log->debug("Trying to stop worker $pid gracefully.");
kill 'QUIT', $pid;
$w->{force} = 1 if $w->{graceful} + $self->graceful_timeout <= time;
$w->{force} = 1 if $w->{graceful} + $self->graceful_timeout <= $time;
}

# Normal stop
Expand Down Expand Up @@ -161,7 +164,8 @@ sub _spawn {

# Manager
die "Can't fork: $!" unless defined(my $pid = fork);
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => time} if $pid;
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time}
if $pid;

# Prepare lock file
my $file = $self->{lock_file};
Expand Down
17 changes: 16 additions & 1 deletion lib/Mojo/Util.pm
Expand Up @@ -8,6 +8,10 @@ use Encode 'find_encoding';
use File::Basename 'dirname';
use File::Spec::Functions 'catfile';
use MIME::Base64 qw(decode_base64 encode_base64);
use Time::HiRes ();

# Check for monotonic clock support
use constant MONOTONIC => eval 'use EV 4.0; 1';

# Punycode bootstring parameters
use constant {
Expand Down Expand Up @@ -45,12 +49,17 @@ our @EXPORT_OK = (
qw(decode deprecated encode get_line hmac_md5_sum hmac_sha1_sum),
qw(html_unescape md5_bytes md5_sum monkey_patch punycode_decode),
qw(punycode_encode quote secure_compare sha1_bytes sha1_sum slurp spurt),
qw(squish trim unquote url_escape url_unescape xml_escape xor_encode)
qw(squish steady_time trim unquote url_escape url_unescape xml_escape),
qw(xor_encode)
);

# DEPRECATED in Rainbow!
push @EXPORT_OK, 'html_escape';

# Fall back to high resolution time if monotonic clock is not available
monkey_patch(__PACKAGE__, 'steady_time',
MONOTONIC ? \&EV::time : \&Time::HiRes::time);

sub b64_decode { decode_base64($_[0]) }

sub b64_encode { encode_base64($_[0], $_[1]) }
Expand Down Expand Up @@ -607,6 +616,12 @@ Write all data at once to file.
Trim whitespace characters from both ends of string and then change all
consecutive groups of whitespace into one space each.
=head2 steady_time
my $time = steady_time;
High resolution time, monotonic if possible.
=head2 trim
my $trimmed = trim $string;
Expand Down
7 changes: 5 additions & 2 deletions t/mojo/util.t
Expand Up @@ -12,8 +12,8 @@ use Mojo::Util
qw(b64_decode b64_encode camelize class_to_file class_to_path decamelize),
qw(decode encode get_line hmac_md5_sum hmac_sha1_sum html_unescape),
qw(md5_bytes md5_sum monkey_patch punycode_decode squish trim unquote),
qw(secure_compare sha1_bytes sha1_sum slurp spurt punycode_encode quote),
qw(url_escape url_unescape xml_escape xor_encode);
qw(secure_compare sha1_bytes sha1_sum slurp spurt steady_time),
qw(punycode_encode quote url_escape url_unescape xml_escape xor_encode);

# camelize
is camelize('foo_bar_baz'), 'FooBarBaz', 'right camelized result';
Expand Down Expand Up @@ -378,6 +378,9 @@ my $file = catfile $dir, 'test.txt';
spurt "just\nworks!", $file;
is slurp($file), "just\nworks!", 'successful roundtrip';

# steady_time
like steady_time, qr/^\d+\.\d+$/, 'high resolution time';

# monkey_patch
{

Expand Down

0 comments on commit 457572f

Please sign in to comment.