Skip to content

Commit

Permalink
add stop_gracefully method to Mojo::IOLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Feb 16, 2015
1 parent a8623de commit f20bb22
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 51 deletions.
3 changes: 1 addition & 2 deletions Changes
@@ -1,14 +1,13 @@

5.80 2015-02-17
- Deprecated Mojo::IOLoop::max_connections in favor of
Mojo::IOLoop::concurrency.
- Removed accept_interval, lock and unlock attributes from Mojo::IOLoop.
- Removed accept_interval, lock_file and lock_timeout attributes from
Mojo::Server::Prefork.
- Removed --accept-interval, --lock-file and --lock-timeout options from
prefork command.
- Removed accept_interval, lock_file and lock_timeout parameters from
Hypntoad.
- Added stop_gracefully method to Mojo::IOLoop.
- Reduced idle CPU usage of Mojo::IOLoop and Mojo::Server::Prefork.
- Improved app generator command to use current best practices.
- Fixed url_for to handle paths without trailing slash correctly in embedded
Expand Down
81 changes: 40 additions & 41 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -9,14 +9,15 @@ use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util qw(deprecated md5_sum steady_time);
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util qw(blessed weaken);

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;

has max_accepts => 0;
has multi_accept => 50;
has reactor => sub {
has max_accepts => 0;
has max_connections => 1000;
has multi_accept => 50;
has reactor => sub {
my $class = Mojo::Reactor::Poll->detect;
warn "-- Reactor initialized ($class)\n" if DEBUG;
return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
Expand Down Expand Up @@ -73,14 +74,6 @@ sub client {
return $id;
}

sub concurrency {
my $self = _instance(shift);
return $self->{concurrency} //= 1000 unless @_;
my $concurrency = $self->{concurrency} = shift;
$self->{stop} ||= $self->recurring(1 => \&_graceful) if $concurrency == 0;
return $self;
}

sub delay {
my $delay = Mojo::IOLoop::Delay->new;
weaken $delay->ioloop(_instance(shift))->{ioloop};
Expand All @@ -89,13 +82,6 @@ sub delay {

sub is_running { _instance(shift)->reactor->is_running }

# DEPRECATED in Tiger Face!
sub max_connections {
deprecated 'Mojo::IOLoop::max_connections is DEPRECATED in favor of'
. ' Mojo::IOLoop::concurrency';
shift->concurrency(@_);
}

sub next_tick {
my ($self, $cb) = (_instance(shift), @_);
weaken $self;
Expand Down Expand Up @@ -136,7 +122,7 @@ sub server {
# Enforce connection limit (randomize to improve load balancing)
if (my $max = $self->max_accepts) {
$self->{accepts} //= $max - int rand $max / 2;
$self->concurrency(0) if ($self->{accepts} -= 1) <= 0;
$self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
}

my $stream = Mojo::IOLoop::Stream->new(pop);
Expand All @@ -158,6 +144,12 @@ sub start {

sub stop { _instance(shift)->reactor->stop }

sub stop_gracefully {
my $self = _instance(shift);
$self->_not_accepting;
$self->{stop} ||= $self->recurring(1 => \&_stop);
}

sub stream {
my ($self, $stream) = (_instance(shift), @_);
return ($self->{connections}{$stream} || {})->{stream} unless ref $stream;
Expand All @@ -166,13 +158,6 @@ sub stream {

sub timer { shift->_timer(timer => @_) }

sub _graceful {
my $self = shift;
return if keys %{$self->{connections}};
$self->_remove(delete $self->{stop});
$self->stop;
}

sub _id {
my $self = shift;
my $id;
Expand All @@ -183,7 +168,9 @@ sub _id {

sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }

sub _limit { keys %{$_[0]->{connections}} >= $_[0]->concurrency }
sub _limit {
$_[0]{stop} || keys %{$_[0]{connections}} >= $_[0]->max_connections;
}

sub _maybe_accepting {
my $self = shift;
Expand All @@ -193,7 +180,7 @@ sub _maybe_accepting {

# Check if multi-accept is desirable
my $m = $self->multi_accept;
$m = 1 if $self->concurrency < $m;
$m = 1 if $self->max_connections < $m;
$_->multi_accept($m)->start for values %{$self->{acceptors} || {}};
$self->{accepting} = 1;
}
Expand All @@ -220,6 +207,13 @@ sub _remove {
$self->_maybe_accepting;
}

sub _stop {
my $self = shift;
return if keys %{$self->{connections}};
$self->_remove(delete $self->{stop});
$self->stop;
}

sub _stream {
my ($self, $stream, $id) = @_;

Expand Down Expand Up @@ -337,6 +331,15 @@ to C<0>. Setting the value to C<0> will allow this event loop to accept new
connections indefinitely. Note that up to half of this value can be subtracted
randomly to improve load balancing between multiple server processes.
=head2 max_connections
my $max = $loop->max_connections;
$loop = $loop->max_connections(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to
C<1000>.
=head2 multi_accept
my $multi = $loop->multi_accept;
Expand Down Expand Up @@ -394,18 +397,6 @@ L<Mojo::IOLoop::Client/"connect">.
...
});
=head2 concurrency
my $max = Mojo::IOLoop->concurrency;
my $max = $loop->concurrency;
$loop = $loop->concurrency(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to
C<1000>. Setting the value to C<0> will make this event loop stop accepting
new connections and allow it to shut down gracefully without interrupting
existing connections.
=head2 delay
my $delay = Mojo::IOLoop->delay;
Expand Down Expand Up @@ -595,6 +586,14 @@ some reactors stop automatically if there are no events being watched anymore.
Stop the event loop, this will not interrupt any existing connections and the
event loop can be restarted by running L</"start"> again.
=head2 stop_gracefully
Mojo::IOLoop->stop_gracefully;
$loop->stop_gracefully;
Stop accepting new connections and wait for all existing connections to be
closed before stopping the event loop.
=head2 stream
my $stream = Mojo::IOLoop->stream($id);
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -41,7 +41,7 @@ sub start {

# Start listening
else { $self->_listen($_) for @{$self->listen} }
if (my $max = $self->max_clients) { $loop->concurrency($max) }
if (my $max = $self->max_clients) { $loop->max_connections($max) }

return $self;
}
Expand Down Expand Up @@ -401,7 +401,7 @@ TLS verification mode, defaults to C<0x03>.
$daemon = $daemon->max_clients(1000);
Maximum number of concurrent client connections, passed along to
L<Mojo::IOLoop/"concurrency">.
L<Mojo::IOLoop/"max_connections">.
=head2 max_requests
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server/Hypnotoad.pm
Expand Up @@ -263,7 +263,7 @@ L<Mojo::Server::Daemon/"backlog">.
clients => 100
Maximum number of concurrent client connections per worker process, defaults
to the value of L<Mojo::IOLoop/"concurrency">. Note that high concurrency
to the value of L<Mojo::IOLoop/"max_connections">. Note that high concurrency
works best with applications that perform mostly non-blocking operations, to
optimize for blocking operations you can decrease this value and increase
L</"workers"> instead for better performance.
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Server/Prefork.pm
Expand Up @@ -154,14 +154,14 @@ sub _spawn {

# Heartbeat messages
weaken $self;
my $cb = sub { $self->_heartbeat(shift->concurrency ? 0 : 1) };
my $cb = sub { $self->_heartbeat(shift->max_connections ? 0 : 1) };
my $loop = $self->ioloop;
$loop->next_tick($cb);
$loop->recurring($self->heartbeat_interval => $cb);

# Clean worker environment
$SIG{$_} = 'DEFAULT' for qw(CHLD INT TERM TTIN TTOU);
$SIG{QUIT} = sub { $loop->concurrency(0) };
$SIG{QUIT} = sub { $loop->stop_gracefully };
delete @$self{qw(poll reader)};
srand;

Expand Down
6 changes: 3 additions & 3 deletions t/mojo/ioloop.t
Expand Up @@ -243,15 +243,15 @@ ok length($server) > length($server_after), 'stream has been resumed';
is $client, $client_after, 'stream was writable while paused';
is $client, 'works!', 'full message has been written';

# Graceful shutdown (concurrency)
# Graceful shutdown
$err = '';
$loop = Mojo::IOLoop->new->concurrency(0);
$loop = Mojo::IOLoop->new;
$loop->stop_gracefully;
$loop->remove(
$loop->client({port => Mojo::IOLoop::Server->generate_port} => sub { }));
$loop->timer(3 => sub { shift->stop; $err = 'failed' });
$loop->start;
ok !$err, 'no error';
is $loop->concurrency, 0, 'right value';

# Graceful shutdown (max_accepts)
$err = '';
Expand Down

0 comments on commit f20bb22

Please sign in to comment.