Skip to content

Commit

Permalink
Merge pull request #421 from kraih/smooth_morbo
Browse files Browse the repository at this point in the history
Smooth restarting for Morbo
  • Loading branch information
kraih committed Nov 27, 2012
2 parents d2b538c + 75c2fee commit 426428e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 41 deletions.
50 changes: 34 additions & 16 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -10,7 +10,7 @@ use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util 'md5_sum';
use Scalar::Util qw(blessed weaken);
use Scalar::Util 'weaken';
use Time::HiRes 'time';

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
Expand All @@ -32,6 +32,28 @@ $SIG{PIPE} = 'IGNORE';
# Initialize singleton reactor early
__PACKAGE__->singleton->reactor;

sub acceptor {
my ($self, $server) = @_;
$self = $self->singleton unless ref $self;

# Find server for id
return $self->{servers}{$server} unless ref $server;

# Make sure connection manager is running
$self->_manager;

# New server
my $id = $self->_id;
$self->{servers}{$id} = $server;
weaken $server->reactor($self->reactor)->{reactor};
$self->{accepts} = $self->max_accepts if $self->max_accepts;

# Stop accepting
$self->_not_accepting;

return $id;
}

sub client {
my ($self, $cb) = (shift, pop);
$self = $self->singleton unless ref $self;
Expand Down Expand Up @@ -113,15 +135,7 @@ sub server {
my ($self, $cb) = (shift, pop);
$self = $self->singleton unless ref $self;

# Make sure connection manager is running
$self->_manager;

# New server
my $id = $self->_id;
my $server = $self->{servers}{$id} = Mojo::IOLoop::Server->new;
weaken $server->reactor($self->reactor)->{reactor};

# Listen
my $server = Mojo::IOLoop::Server->new;
weaken $self;
$server->on(
accept => sub {
Expand All @@ -141,12 +155,8 @@ sub server {
}
);
$server->listen(@_);
$self->{accepts} = $self->max_accepts if $self->max_accepts;

# Stop accepting
$self->_not_accepting;

return $id;
return $self->acceptor($server);
}

sub singleton { state $loop ||= shift->SUPER::new }
Expand All @@ -167,7 +177,7 @@ sub stream {
$self = $self->singleton unless ref $self;

# Connect stream with reactor
return $self->_stream($stream, $self->_id) if blessed $stream;
return $self->_stream($stream, $self->_id) if ref $stream;

# Find stream for id
return undef unless my $c = $self->{connections}{$stream};
Expand Down Expand Up @@ -434,6 +444,14 @@ processes. Note that exceptions in this callback are not captured.
L<Mojo::IOLoop> inherits all methods from L<Mojo::Base> and implements the
following new ones.
=head2 C<acceptor>
my $server = Mojo::IOLoop->acceptor($id);
my $server = $loop->acceptor($id);
my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.
=head2 C<client>
my $id
Expand Down
37 changes: 33 additions & 4 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -20,7 +20,7 @@ sub DESTROY {
my $self = shift;
return unless my $loop = $self->ioloop;
$self->_remove($_) for keys %{$self->{connections} || {}};
$loop->remove($_) for @{$self->{listening} || []};
$loop->remove($_) for @{$self->{acceptors} || []};
}

sub run {
Expand Down Expand Up @@ -55,8 +55,31 @@ sub setuidgid {

sub start {
my $self = shift;
$self->_listen($_) for @{$self->listen};
$self->ioloop->max_connections($self->max_clients);

# Resume accepting connections
my $loop = $self->ioloop;
if (my $acceptors = $self->{acceptors}) {
push @$acceptors, $loop->acceptor(delete $self->{servers}{$_})
for keys %{$self->{servers}};
}

# Start listening
else { $self->_listen($_) for @{$self->listen} }
$loop->max_connections($self->max_clients);

return $self;
}

sub stop {
my $self = shift;

my $loop = $self->ioloop;
while (my $id = shift @{$self->{acceptors}}) {
$self->{servers}{$id} = my $server = $loop->acceptor($id);
$loop->remove($id);
$server->stop;
}

return $self;
}

Expand Down Expand Up @@ -192,7 +215,7 @@ sub _listen {
sub { $self->app->log->debug('Inactivity timeout.') if $c->{tx} });
}
);
push @{$self->{listening} ||= []}, $id;
push @{$self->{acceptors} ||= []}, $id;

# Friendly message
return if $self->silent;
Expand Down Expand Up @@ -436,6 +459,12 @@ Set user and group for process.
Start accepting connections.
=head2 C<stop>
$daemon = $daemon->stop;
Stop accepting connections.
=head1 DEBUGGING
You can set the C<MOJO_DAEMON_DEBUG> environment variable to get some advanced
Expand Down
7 changes: 6 additions & 1 deletion lib/Mojo/Server/Morbo.pm
Expand Up @@ -25,14 +25,19 @@ sub check_file {
sub run {
my ($self, $app) = @_;

# Watch files and manage worker
# Prepare environment
$SIG{CHLD} = sub { $self->_reap };
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = sub {
$self->{finished} = 1;
kill 'TERM', $self->{running} if $self->{running};
};
unshift @{$self->watch}, $app;
$self->{modified} = 1;

# Prepare listen sockets
my $daemon = Mojo::Server::Daemon->new->start->stop;

# Manage
$self->_manage while !$self->{finished} || $self->{running};
exit 0;
}
Expand Down
21 changes: 21 additions & 0 deletions t/mojo/app.t
Expand Up @@ -200,4 +200,25 @@ ok $local_port > 0, 'has local port';
ok $remote_address, 'has local address';
ok $remote_port > 0, 'has local port';

# Throttling
$port = Mojo::IOLoop->generate_port;
my $daemon = Mojo::Server::Daemon->new(app => $app,
listen => ["http://127.0.0.1:$port"]);
$daemon->start;
$tx = $ua->get("http://127.0.0.1:$port/throttle1" => {Connection => 'close'});
ok $tx->success, 'successful';
is $tx->res->code, 200, 'right status';
is $tx->res->body, 'Your Mojo is working!', 'right content';
$daemon->stop;
$tx = $ua->inactivity_timeout(0.5)
->get("http://127.0.0.1:$port/throttle2" => {Connection => 'close'});
ok !$tx->success, 'not successful';
is $tx->error, 'Inactivity timeout', 'right error';
$daemon->start;
$tx = $ua->inactivity_timeout(10)
->get("http://127.0.0.1:$port/throttle3" => {Connection => 'close'});
ok $tx->success, 'successful';
is $tx->res->code, 200, 'right status';
is $tx->res->body, 'Your Mojo is working!', 'right content';

done_testing();
17 changes: 10 additions & 7 deletions t/mojo/ioloop.t
Expand Up @@ -79,18 +79,21 @@ ok $count < 10, 'less than ten recurring events';

# Handle
my $port = Mojo::IOLoop->generate_port;
my $handle;
$id = $loop->server(
my ($handle, $handle2);
$id = Mojo::IOLoop->server(
(address => '127.0.0.1', port => $port) => sub {
my ($loop, $stream) = @_;
$handle = $stream->handle;
$loop->stop;
Mojo::IOLoop->stop;
}
);
$id2 = $loop->client((address => 'localhost', port => $port) => sub { });
$loop->start;
$loop->remove($id);
$loop->remove($id2);
Mojo::IOLoop->acceptor($id)->on(accept => sub { $handle2 = pop });
$id2
= Mojo::IOLoop->client((address => 'localhost', port => $port) => sub { });
Mojo::IOLoop->start;
Mojo::IOLoop->remove($id);
Mojo::IOLoop->remove($id2);
is $handle, $handle2, 'handles are equal';
isa_ok $handle, 'IO::Socket', 'right reference';

# The poll reactor stops when there are no events being watched anymore
Expand Down
13 changes: 0 additions & 13 deletions t/mojo/morbo.t
Expand Up @@ -40,7 +40,6 @@ my $port = Mojo::IOLoop->generate_port;
my $prefix = "$FindBin::Bin/../../script";
my $pid = open my $server, '-|', $^X, "$prefix/morbo", '-l',
"http://127.0.0.1:$port", $script;
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
Expand Down Expand Up @@ -77,12 +76,6 @@ ok $morbo->check_file($script), 'file has changed';
ok((stat $script)[9] > $mtime, 'modify time has changed');
is((stat $script)[7], $size, 'still equal size');
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => '127.0.0.1',
PeerPort => $port
);

# Application has been reloaded
$tx = $ua->get("http://127.0.0.1:$port/hello");
Expand Down Expand Up @@ -113,12 +106,6 @@ ok $morbo->check_file($script), 'file has changed';
ok((stat $script)[9] == $mtime, 'modify time has not changed');
isnt((stat $script)[7], $size, 'size has changed');
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => '127.0.0.1',
PeerPort => $port
);

# Application has been reloaded again
$tx = $ua->get("http://127.0.0.1:$port/hello");
Expand Down

0 comments on commit 426428e

Please sign in to comment.