Skip to content

Commit

Permalink
reduce idle CPU usage by not using an accept mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Feb 16, 2015
1 parent 2013377 commit 15ba0ad
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 239 deletions.
10 changes: 10 additions & 0 deletions Changes
@@ -1,5 +1,15 @@

5.80 2015-02-16
- Deprecated Mojo::IOLoop::max_connections in favor of
Mojo::IOLoop::concurrency.
- Removed accept_interval, lock and unlock attributes from Mojo::IOLoop.
- Removed accept_interval, lock_file and lock_timeout attributes from
Mojo::Server::Prefork.
- Removed --accept-interval, --lock-file and --lock-timeout options from
prefork command.
- Removed accept_interval, lock_file and lock_timeout parameters from
Hypntoad.
- Reduced idle CPU usage of Mojo::IOLoop and Mojo::Server::Prefork.
- Improved app generator command to use current best practices.
- Fixed url_for to handle paths without trailing slash correctly in embedded
applications.
Expand Down
163 changes: 60 additions & 103 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -9,17 +9,14 @@ use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util qw(md5_sum steady_time);
use Mojo::Util qw(deprecated md5_sum steady_time);
use Scalar::Util qw(blessed weaken);

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;

has accept_interval => 0.025;
has [qw(lock unlock)];
has max_accepts => 0;
has max_connections => 1000;
has multi_accept => 50;
has reactor => sub {
has max_accepts => 0;
has multi_accept => 50;
has reactor => sub {
my $class = Mojo::Reactor::Poll->detect;
warn "-- Reactor initialized ($class)\n" if DEBUG;
return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
Expand All @@ -44,16 +41,14 @@ sub acceptor {

# Allow new acceptor to get picked up
$self->_not_accepting;
$self->_maybe_accepting;

return $id;
}

sub client {
my ($self, $cb) = (_instance(shift), pop);

# Make sure timers are running
$self->_recurring;

my $id = $self->_id;
my $client = $self->{connections}{$id}{client} = Mojo::IOLoop::Client->new;
weaken $client->reactor($self->reactor)->{reactor};
Expand All @@ -78,6 +73,14 @@ sub client {
return $id;
}

sub concurrency {
my $self = _instance(shift);
return $self->{concurrency} //= 1000 unless @_;
my $concurrency = $self->{concurrency} = shift;
$self->{stop} ||= $self->recurring(1 => \&_stop) if $concurrency == 0;
return $self;
}

sub delay {
my $delay = Mojo::IOLoop::Delay->new;
weaken $delay->ioloop(_instance(shift))->{ioloop};
Expand All @@ -86,6 +89,13 @@ sub delay {

sub is_running { _instance(shift)->reactor->is_running }

# DEPRECATED in Tiger Face!
sub max_connections {
deprecated 'Mojo::IOLoop::max_connections is DEPRECATED in favor of'
. ' Mojo::IOLoop::concurrency';
shift->concurrency(@_);
}

sub next_tick {
my ($self, $cb) = (_instance(shift), @_);
weaken $self;
Expand All @@ -107,8 +117,9 @@ sub reset {
my $self = _instance(shift);
$self->_remove($_)
for keys %{$self->{acceptors}}, keys %{$self->{connections}};
$self->_remove($self->{stop}) if $self->{stop};
$self->reactor->reset;
$self->$_ for qw(_stop stop);
$self->stop;
}

sub server {
Expand All @@ -119,13 +130,13 @@ sub server {
$server->on(
accept => sub {

# Release accept mutex
$self->_not_accepting;
# Stop accepting if connection limit has been reached
$self->_not_accepting if $self->_limit;

# Enforce connection limit (randomize to improve load balancing)
if (my $max = $self->max_accepts) {
$self->{accepts} //= $max - int rand $max / 2;
$self->max_connections(0) if ($self->{accepts} -= 1) <= 0;
$self->concurrency(0) if ($self->{accepts} -= 1) <= 0;
}

my $stream = Mojo::IOLoop::Stream->new(pop);
Expand Down Expand Up @@ -155,28 +166,6 @@ sub stream {

sub timer { shift->_timer(timer => @_) }

sub _accepting {
my $self = shift;

# Check if we have acceptors
my $acceptors = $self->{acceptors} ||= {};
return $self->_remove(delete $self->{accept}) unless keys %$acceptors;

# Check connection limit
my $i = keys %{$self->{connections}};
my $max = $self->max_connections;
return unless $i < $max;

# Acquire accept mutex
if (my $cb = $self->lock) { return unless $cb->(!$i) }
$self->_remove(delete $self->{accept});

# Check if multi-accept is desirable
my $multi = $self->multi_accept;
$_->multi_accept($max < $multi ? 1 : $multi)->start for values %$acceptors;
$self->{accepting} = 1;
}

sub _id {
my $self = shift;
my $id;
Expand All @@ -187,24 +176,25 @@ sub _id {

sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }

sub _not_accepting {
my $self = shift;
sub _limit { keys %{$_[0]->{connections}} >= $_[0]->concurrency }

# Make sure timers are running
$self->_recurring;
sub _maybe_accepting {
my $self = shift;

# Release accept mutex
return unless delete $self->{accepting};
return unless my $cb = $self->unlock;
$cb->();
# Check connection limit
return if $self->{accepting} || $self->_limit;

$_->stop for values %{$self->{acceptors} || {}};
# Check if multi-accept is desirable
my $m = $self->multi_accept;
$m = 1 if $self->concurrency < $m;
$_->multi_accept($m)->start for values %{$self->{acceptors} || {}};
$self->{accepting} = 1;
}

sub _recurring {
sub _not_accepting {
my $self = shift;
$self->{accept} ||= $self->recurring($self->accept_interval => \&_accepting);
$self->{stop} ||= $self->recurring(1 => \&_stop);
return unless delete $self->{accepting};
$_->stop for values %{$self->{acceptors} || {}};
}

sub _remove {
Expand All @@ -214,27 +204,26 @@ sub _remove {
return unless my $reactor = $self->reactor;
return if $reactor->remove($id);

# Acceptor
if (delete $self->{acceptors}{$id}) { $self->_not_accepting }

# Connection
else { delete $self->{connections}{$id} }
return $self->_maybe_accepting if delete $self->{connections}{$id};

# Acceptor
return unless delete $self->{acceptors}{$id};
$self->_not_accepting;
$self->_maybe_accepting;
}

sub _stop {
my $self = shift;
return if keys %{$self->{connections}};
$self->stop if $self->max_connections == 0;
return if keys %{$self->{acceptors}};
$self->{$_} && $self->_remove(delete $self->{$_}) for qw(accept stop);
return if keys %{$self->{connections}};
$self->stop if $self->concurrency == 0;
return if keys %{$self->{acceptors}};
$self->_remove(delete $self->{stop}) if $self->{stop};
}

sub _stream {
my ($self, $stream, $id) = @_;

# Make sure timers are running
$self->_recurring;

# Connect stream with reactor
$self->{connections}{$id}{stream} = $stream;
weaken $stream->reactor($self->reactor)->{reactor};
Expand Down Expand Up @@ -338,31 +327,6 @@ See L<Mojolicious::Guides::Cookbook/"REAL-TIME WEB"> for more.
L<Mojo::IOLoop> implements the following attributes.
=head2 accept_interval
my $interval = $loop->accept_interval;
$loop = $loop->accept_interval(0.5);
Interval in seconds for trying to reacquire the accept mutex, defaults to
C<0.025>. Note that changing this value can affect performance and idle CPU
usage.
=head2 lock
my $cb = $loop->lock;
$loop = $loop->lock(sub {...});
A callback for acquiring the accept mutex, used to sync multiple server
processes. The callback should return true or false. Note that exceptions in
this callback are not captured.
$loop->lock(sub {
my $blocking = shift;
# Got the accept mutex, start accepting new connections
return 1;
});
=head2 max_accepts
my $max = $loop->max_accepts;
Expand All @@ -374,17 +338,6 @@ to C<0>. Setting the value to C<0> will allow this event loop to accept new
connections indefinitely. Note that up to half of this value can be subtracted
randomly to improve load balancing between multiple server processes.
=head2 max_connections
my $max = $loop->max_connections;
$loop = $loop->max_connections(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to
C<1000>. Setting the value to C<0> will make this event loop stop accepting
new connections and allow it to shut down gracefully without interrupting
existing connections.
=head2 multi_accept
my $multi = $loop->multi_accept;
Expand Down Expand Up @@ -413,14 +366,6 @@ L<Mojo::Reactor/"error">.
# Remove handle again
$loop->reactor->remove($handle);
=head2 unlock
my $cb = $loop->unlock;
$loop = $loop->unlock(sub {...});
A callback for releasing the accept mutex, used to sync multiple server
processes. Note that exceptions in this callback are not captured.
=head1 METHODS
L<Mojo::IOLoop> inherits all methods from L<Mojo::Base> and implements the
Expand Down Expand Up @@ -450,6 +395,18 @@ L<Mojo::IOLoop::Client/"connect">.
...
});
=head2 concurrency
my $max = Mojo::IOLoop->concurrency;
my $max = $loop->concurrency;
$loop = $loop->concurrency(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to
C<1000>. Setting the value to C<0> will make this event loop stop accepting
new connections and allow it to shut down gracefully without interrupting
existing connections.
=head2 delay
my $delay = Mojo::IOLoop->delay;
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/IOLoop/Client.pm
Expand Up @@ -65,8 +65,8 @@ sub connect {

sub _cleanup {
my $self = shift;
return $self unless my $reactor = $self->reactor;
$NDN->timedout($self->{dns}) if $self->{dns};
my $reactor = $self->reactor;
$self->{$_} && $reactor->remove(delete $self->{$_}) for qw(dns timer handle);
return $self;
}
Expand Down
7 changes: 4 additions & 3 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -16,9 +16,10 @@ has listen => sub { [split ',', $ENV{MOJO_LISTEN} || 'http://*:3000'] };
has max_requests => 25;

sub DESTROY {
return if Mojo::Util::_global_destruction();
my $self = shift;
return unless my $loop = $self->ioloop;
$self->_remove($_) for keys %{$self->{connections} || {}};
my $loop = $self->ioloop;
$loop->remove($_) for @{$self->acceptors};
}

Expand All @@ -40,7 +41,7 @@ sub start {

# Start listening
else { $self->_listen($_) for @{$self->listen} }
if (my $max = $self->max_clients) { $loop->max_connections($max) }
if (my $max = $self->max_clients) { $loop->concurrency($max) }

return $self;
}
Expand Down Expand Up @@ -400,7 +401,7 @@ TLS verification mode, defaults to C<0x03>.
$daemon = $daemon->max_clients(1000);
Maximum number of concurrent client connections, passed along to
L<Mojo::IOLoop/"max_connections">.
L<Mojo::IOLoop/"concurrency">.
=head2 max_requests
Expand Down

0 comments on commit 15ba0ad

Please sign in to comment.