Skip to content

Commit

Permalink
made Morbo restarts a lot smoother
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Nov 26, 2012
1 parent a6d4120 commit ba5388f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 36 deletions.
53 changes: 36 additions & 17 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -10,7 +10,7 @@ use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util 'md5_sum';
use Scalar::Util qw(blessed weaken);
use Scalar::Util 'weaken';
use Time::HiRes 'time';

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
Expand All @@ -32,6 +32,28 @@ $SIG{PIPE} = 'IGNORE';
# Initialize singleton reactor early
__PACKAGE__->singleton->reactor;

sub accepting {
my ($self, $server) = @_;
$self = $self->singleton unless ref $self;

# Find server for id
return $self->{servers}{$server} unless ref $server;

# Make sure connection manager is running
$self->_manager;

# New server
my $id = $self->_id;
$self->{servers}{$id} = $server;
weaken $server->reactor($self->reactor)->{reactor};
$self->{accepts} = $self->max_accepts if $self->max_accepts;

# Stop accepting
$self->_not_accepting;

return $id;
}

sub client {
my ($self, $cb) = (shift, pop);
$self = $self->singleton unless ref $self;
Expand Down Expand Up @@ -113,15 +135,7 @@ sub server {
my ($self, $cb) = (shift, pop);
$self = $self->singleton unless ref $self;

# Make sure connection manager is running
$self->_manager;

# New server
my $id = $self->_id;
my $server = $self->{servers}{$id} = Mojo::IOLoop::Server->new;
weaken $server->reactor($self->reactor)->{reactor};

# Listen
my $server = Mojo::IOLoop::Server->new;
weaken $self;
$server->on(
accept => sub {
Expand All @@ -141,12 +155,8 @@ sub server {
}
);
$server->listen(@_);
$self->{accepts} = $self->max_accepts if $self->max_accepts;

# Stop accepting
$self->_not_accepting;

return $id;
return $self->accepting($server);
}

sub singleton { state $loop ||= shift->SUPER::new }
Expand All @@ -167,7 +177,7 @@ sub stream {
$self = $self->singleton unless ref $self;

# Connect stream with reactor
return $self->_stream($stream, $self->_id) if blessed $stream;
return $self->_stream($stream, $self->_id) if ref $stream;

# Find stream for id
return undef unless my $c = $self->{connections}{$stream};
Expand Down Expand Up @@ -434,6 +444,15 @@ processes. Note that exceptions in this callback are not captured.
L<Mojo::IOLoop> inherits all methods from L<Mojo::Base> and implements the
following new ones.
=head2 C<accepting>
my $server = Mojo::IOLoop->accepting($id);
my $server = $loop->accepting($id);
my $id = $loop->accepting(Mojo::IOLoop::Server->new);
Get L<Mojo::IOLoop::Server> object for id or start accepting connections from
object.
=head2 C<client>
my $id
Expand Down Expand Up @@ -588,7 +607,7 @@ event loop can be restarted by running C<start> again.
my $stream = Mojo::IOLoop->stream($id);
my $stream = $loop->stream($id);
my $id = $loop->stream($stream);
my $id = $loop->stream(Mojo::IOLoop::Stream->new);
Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
Expand Down
37 changes: 33 additions & 4 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -20,7 +20,7 @@ sub DESTROY {
my $self = shift;
return unless my $loop = $self->ioloop;
$self->_remove($_) for keys %{$self->{connections} || {}};
$loop->remove($_) for @{$self->{listening} || []};
$loop->remove($_) for @{$self->{accepting} || []};
}

sub run {
Expand Down Expand Up @@ -55,8 +55,31 @@ sub setuidgid {

sub start {
my $self = shift;
$self->_listen($_) for @{$self->listen};
$self->ioloop->max_connections($self->max_clients);

# Resume accepting connections
my $loop = $self->ioloop;
if (my $accepting = $self->{accepting}) {
push @$accepting, $loop->accepting(delete $self->{servers}{$_})
for keys %{$self->{servers}};
}

# Start listening
else { $self->_listen($_) for @{$self->listen} }
$loop->max_connections($self->max_clients);

return $self;
}

sub stop {
my $self = shift;

my $loop = $self->ioloop;
while (my $id = shift @{$self->{accepting}}) {
$self->{servers}{$id} = my $server = $loop->accepting($id);
$loop->remove($id);
$server->stop;
}

return $self;
}

Expand Down Expand Up @@ -192,7 +215,7 @@ sub _listen {
sub { $self->app->log->debug('Inactivity timeout.') if $c->{tx} });
}
);
push @{$self->{listening} ||= []}, $id;
push @{$self->{accepting} ||= []}, $id;

# Friendly message
return if $self->silent;
Expand Down Expand Up @@ -436,6 +459,12 @@ Set user and group for process.
Start accepting connections.
=head2 C<stop>
$daemon = $daemon->stop;
Stop accepting connections.
=head1 DEBUGGING
You can set the C<MOJO_DAEMON_DEBUG> environment variable to get some advanced
Expand Down
7 changes: 6 additions & 1 deletion lib/Mojo/Server/Morbo.pm
Expand Up @@ -25,14 +25,19 @@ sub check_file {
sub run {
my ($self, $app) = @_;

# Watch files and manage worker
# Prepare environment
$SIG{CHLD} = sub { $self->_reap };
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = sub {
$self->{finished} = 1;
kill 'TERM', $self->{running} if $self->{running};
};
unshift @{$self->watch}, $app;
$self->{modified} = 1;

# Prepare listen sockets
my $daemon = Mojo::Server::Daemon->new->start->stop;

# Manage
$self->_manage while !$self->{finished} || $self->{running};
exit 0;
}
Expand Down
4 changes: 3 additions & 1 deletion t/mojo/ioloop.t
Expand Up @@ -79,18 +79,20 @@ ok $count < 10, 'less than ten recurring events';

# Handle
my $port = Mojo::IOLoop->generate_port;
my $handle;
my ($handle, $handle2);
$id = $loop->server(
(address => '127.0.0.1', port => $port) => sub {
my ($loop, $stream) = @_;
$handle = $stream->handle;
$loop->stop;
}
);
$loop->accepting($id)->on(accept => sub { $handle2 = pop });
$id2 = $loop->client((address => 'localhost', port => $port) => sub { });
$loop->start;
$loop->remove($id);
$loop->remove($id2);
is $handle, $handle2, 'handles are equal';
isa_ok $handle, 'IO::Socket', 'right reference';

# The poll reactor stops when there are no events being watched anymore
Expand Down
13 changes: 0 additions & 13 deletions t/mojo/morbo.t
Expand Up @@ -40,7 +40,6 @@ my $port = Mojo::IOLoop->generate_port;
my $prefix = "$FindBin::Bin/../../script";
my $pid = open my $server, '-|', $^X, "$prefix/morbo", '-l',
"http://127.0.0.1:$port", $script;
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
Expand Down Expand Up @@ -77,12 +76,6 @@ ok $morbo->check_file($script), 'file has changed';
ok((stat $script)[9] > $mtime, 'modify time has changed');
is((stat $script)[7], $size, 'still equal size');
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => '127.0.0.1',
PeerPort => $port
);

# Application has been reloaded
$tx = $ua->get("http://127.0.0.1:$port/hello");
Expand Down Expand Up @@ -113,12 +106,6 @@ ok $morbo->check_file($script), 'file has changed';
ok((stat $script)[9] == $mtime, 'modify time has not changed');
isnt((stat $script)[7], $size, 'size has changed');
sleep 3;
sleep 1
while !IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => '127.0.0.1',
PeerPort => $port
);

# Application has been reloaded again
$tx = $ua->get("http://127.0.0.1:$port/hello");
Expand Down
22 changes: 22 additions & 0 deletions t/mojo/user_agent.t
Expand Up @@ -10,6 +10,7 @@ use Test::More;
use IO::Compress::Gzip 'gzip';
use Mojo::IOLoop;
use Mojo::Message::Request;
use Mojo::Server::Daemon;
use Mojo::UserAgent;
use Mojolicious::Lite;

Expand Down Expand Up @@ -494,4 +495,25 @@ $tx = $ua->get("http://localhost:$port/");
ok !$tx->success, 'not successful';
is $tx->error, 'Premature connection close', 'right error';

# Inactivity timeout (throttling)
$port = Mojo::IOLoop->generate_port;
my $daemon = Mojo::Server::Daemon->new(app => app,
listen => ["http://127.0.0.1:$port"]);
$daemon->start;
$tx = $ua->get("http://127.0.0.1:$port" => {Connection => 'close'});
ok $tx->success, 'successful';
is $tx->res->code, 200, 'right status';
is $tx->res->body, 'works!', 'right content';
$daemon->stop;
$tx = $ua->inactivity_timeout(0.5)
->get("http://127.0.0.1:$port" => {Connection => 'close'});
ok !$tx->success, 'not successful';
is $tx->error, 'Inactivity timeout', 'right error';
$daemon->start;
$tx = $ua->inactivity_timeout(10)
->get("http://127.0.0.1:$port" => {Connection => 'close'});
ok $tx->success, 'successful';
is $tx->res->code, 200, 'right status';
is $tx->res->body, 'works!', 'right content';

done_testing();

0 comments on commit ba5388f

Please sign in to comment.