Skip to content

Commit

Permalink
added modules Mojo::Server::Prefork and Mojolicious::Command::prefork
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Jan 16, 2013
1 parent d3ef44a commit cc86d1a
Show file tree
Hide file tree
Showing 10 changed files with 675 additions and 232 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,5 +1,6 @@

3.81 2013-01-16
- Added modules Mojo::Server::Prefork and Mojolicious::Command::prefork.
- Updated jQuery to version 1.9.
- Improved documentation.
- Fixed small memory leak in Hypnotoad that only shows when Perl is compiled
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server/Daemon.pm
Expand Up @@ -27,7 +27,7 @@ sub run {
my $self = shift;

# Signals
$SIG{INT} = $SIG{TERM} = sub { exit 0 };
local $SIG{INT} = local $SIG{TERM} = sub { $self->ioloop->stop };

# Change user/group and start accepting connections
$self->start->setuidgid->ioloop->start;
Expand Down
265 changes: 41 additions & 224 deletions lib/Mojo/Server/Hypnotoad.pm
Expand Up @@ -4,26 +4,10 @@ use Mojo::Base -base;
# "Bender: I was God once.
# God: Yes, I saw. You were doing well, until everyone died."
use Cwd 'abs_path';
use Fcntl ':flock';
use File::Basename 'dirname';
use File::Spec::Functions qw(catfile tmpdir);
use IO::Poll 'POLLIN';
use List::Util 'shuffle';
use Mojo::Server::Daemon;
use POSIX qw(setsid WNOHANG);
use File::Spec::Functions 'catfile';
use Mojo::Server::Prefork;
use Scalar::Util 'weaken';
use Time::HiRes 'ualarm';

sub DESTROY {
my $self = shift;

# Worker or command
return unless $self->{finished};

# Manager
if (my $file = $self->{config}{pid_file}) { unlink $file if -w $file }
if (my $file = $self->{config}{lock_file}) { unlink $file if -w $file }
}

sub run {
my ($self, $path) = @_;
Expand All @@ -45,9 +29,13 @@ sub run {
die "Can't exec: $!" if !$ENV{HYPNOTOAD_REV}++ && !exec $ENV{HYPNOTOAD_EXE};

# Preload application and configure server
my $daemon = $self->{daemon} = Mojo::Server::Daemon->new;
my $app = $daemon->load_app($ENV{HYPNOTOAD_APP});
my $prefork = $self->{prefork} = Mojo::Server::Prefork->new;
my $app = $prefork->load_app($ENV{HYPNOTOAD_APP});
$self->_config($app);
weaken $self;
$prefork->on(manage => sub { $self->_manage });
$prefork->on(reap => sub { $self->_reap(pop) });
$prefork->on(finish => sub { $self->{finished} = 1 });

# Testing
_exit('Everything looks good!') if $ENV{HYPNOTOAD_TEST};
Expand All @@ -59,100 +47,54 @@ sub run {
$self->_hot_deploy unless $ENV{HYPNOTOAD_PID};

# Daemonize as early as possible (but not for restarts)
if (!$ENV{HYPNOTOAD_FOREGROUND} && $ENV{HYPNOTOAD_REV} < 3) {

# Fork and kill parent
die "Can't fork: $!" unless defined(my $pid = fork);
exit 0 if $pid;
setsid or die "Can't start a new session: $!";
$prefork->daemonize
if !$ENV{HYPNOTOAD_FOREGROUND} && $ENV{HYPNOTOAD_REV} < 3;

# Close file handles
open STDIN, '</dev/null';
open STDOUT, '>/dev/null';
open STDERR, '>&STDOUT';
}
# Clean manager environment
local $SIG{USR2} = sub { $self->{upgrade} ||= time };

# Start accepting connections
my $log = $self->{log} = $app->log;
$log->info(qq[Hypnotoad server $$ started for "$ENV{HYPNOTOAD_APP}".]);
$daemon->start;

# Pipe for worker communication
pipe($self->{reader}, $self->{writer}) or die "Can't create pipe: $!";
$self->{poll} = IO::Poll->new;
$self->{poll}->mask($self->{reader}, POLLIN);

# Clean manager environment
my $c = $self->{config};
$SIG{INT} = $SIG{TERM} = sub { $self->{finished} = 1 };
$SIG{CHLD} = sub {
while ((my $pid = waitpid -1, WNOHANG) > 0) { $self->_reap($pid) }
};
$SIG{QUIT} = sub { $self->{finished} = $self->{graceful} = 1 };
$SIG{USR2} = sub { $self->{upgrade} ||= time };
$SIG{TTIN} = sub { $c->{workers}++ };
$SIG{TTOU} = sub {
return unless $c->{workers} && $c->{workers}--;
$self->{workers}{shuffle keys %{$self->{workers}}}{graceful} ||= time;
};

# Mainloop
$self->_manage while 1;
$app->log->info(qq[Hypnotoad server $$ started for "$ENV{HYPNOTOAD_APP}".]);
$prefork->run;
}

sub _config {
my ($self, $app) = @_;

# Hypnotoad settings
my $c = $self->{config} = $app->config('hypnotoad') || {};
$c->{graceful_timeout} ||= 20;
$c->{heartbeat_interval} ||= 5;
$c->{heartbeat_timeout} ||= 20;
$c->{lock_file} ||= catfile tmpdir, 'hypnotoad.lock';
$c->{lock_file} .= ".$$";
$c->{lock_timeout} ||= 0.5;
$c->{pid_file} ||= catfile dirname($ENV{HYPNOTOAD_APP}), 'hypnotoad.pid';
$c->{upgrade_timeout} ||= 60;
$c->{workers} ||= 4;

# Daemon settings
my $c = $app->config('hypnotoad') || {};
$self->{upgrade_timeout} = $c->{upgrade_timeout} || 60;

# Prefork settings
$ENV{MOJO_REVERSE_PROXY} = $c->{proxy} if defined $c->{proxy};
my $daemon = $self->{daemon};
defined $c->{$_} and $daemon->$_($c->{$_}) for qw(backlog group user);
$daemon->max_clients($c->{clients} || 1000);
$daemon->max_requests($c->{keep_alive_requests} || 25);
$daemon->inactivity_timeout($c->{inactivity_timeout} // 15);
$daemon->listen($c->{listen} || ['http://*:8080']);
my $prefork = $self->{prefork};
$prefork->pid_file($c->{pid_file}
|| catfile(dirname($ENV{HYPNOTOAD_APP}), 'hypnotoad.pid'));
my @settings = (
qw(backlog graceful_timeout group heartbeat_interval),
qw(heartbeat_timeout lock_file lock_timeout user workers)
);
defined $c->{$_} and $prefork->$_($c->{$_}) for @settings;
$prefork->max_clients($c->{clients} || 1000);
$prefork->max_requests($c->{keep_alive_requests} || 25);
$prefork->inactivity_timeout($c->{inactivity_timeout} // 15);
$prefork->listen($c->{listen} || ['http://*:8080']);

# Event loop settings
my $loop = $daemon->ioloop;
my $loop = $prefork->ioloop;
$loop->max_accepts($c->{accepts} // 1000);
defined $c->{$_} and $loop->$_($c->{$_})
for qw(accept_interval multi_accept);
}

sub _exit { say shift and exit 0 }

sub _heartbeat {
my $self = shift;

# Poll for heartbeats
my $poll = $self->{poll};
$poll->poll(1);
return unless $poll->handles(POLLIN);
return unless $self->{reader}->sysread(my $chunk, 4194304);

# Update heartbeats
$self->{workers}{$1} and $self->{workers}{$1}{time} = time
while $chunk =~ /(\d+)\n/g;
}

sub _hot_deploy {
my $self = shift;

# Make sure server is running and clean up PID file if necessary
return unless defined(my $pid = $self->_pid);
my $file = $self->{config}{pid_file};
my $file = $self->{prefork}->pid_file;
return -w $file ? unlink $file : undef unless $pid && kill 0, $pid;

# Start hot deployment
Expand All @@ -163,169 +105,44 @@ sub _hot_deploy {
sub _manage {
my $self = shift;

# Housekeeping
my $c = $self->{config};
if (!$self->{finished}) {

# Spawn more workers
$self->_spawn while keys %{$self->{workers}} < $c->{workers};

# Check PID file
$self->_pid_file;
}

# Shutdown
elsif (!keys %{$self->{workers}}) { exit 0 }

# Upgraded
my $log = $self->{prefork}->app->log;
if ($ENV{HYPNOTOAD_PID} && $ENV{HYPNOTOAD_PID} ne $$) {
$self->{log}->info("Upgrade successful, stopping $ENV{HYPNOTOAD_PID}.");
$log->info("Upgrade successful, stopping $ENV{HYPNOTOAD_PID}.");
kill 'QUIT', $ENV{HYPNOTOAD_PID};
}
$ENV{HYPNOTOAD_PID} = $$ unless ($ENV{HYPNOTOAD_PID} // '') eq $$;

# Check heartbeat
$self->_heartbeat;

# Upgrade
if ($self->{upgrade} && !$self->{finished}) {

# Fresh start
unless ($self->{new}) {
$self->{log}->info('Starting zero downtime software upgrade.');
$log->info('Starting zero downtime software upgrade.');
die "Can't fork: $!" unless defined(my $pid = $self->{new} = fork);
exec($ENV{HYPNOTOAD_EXE}) or die("Can't exec: $!") unless $pid;
}

# Timeout
kill 'KILL', $self->{new}
if $self->{upgrade} + $c->{upgrade_timeout} <= time;
}

# Workers
while (my ($pid, $w) = each %{$self->{workers}}) {

# No heartbeat (graceful stop)
my $interval = $c->{heartbeat_interval};
my $timeout = $c->{heartbeat_timeout};
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= time)) {
$self->{log}->info("Worker $pid has no heartbeat, restarting.");
$w->{graceful} = time;
}

# Graceful stop with timeout
$w->{graceful} ||= time if $self->{graceful};
if ($w->{graceful}) {
$self->{log}->debug("Trying to stop worker $pid gracefully.");
kill 'QUIT', $pid;
$w->{force} = 1 if $w->{graceful} + $c->{graceful_timeout} <= time;
}

# Normal stop
if (($self->{finished} && !$self->{graceful}) || $w->{force}) {
$self->{log}->debug("Stopping worker $pid.");
kill 'KILL', $pid;
}
if $self->{upgrade} + $self->{upgrade_timeout} <= time;
}
}

sub _pid {
return undef unless open my $file, '<', shift->{config}{pid_file};
return undef unless open my $file, '<', shift->{prefork}->pid_file;
my $pid = <$file>;
chomp $pid;
return $pid;
}

sub _pid_file {
my $self = shift;

# Don't need a PID file anymore
return if $self->{finished};

# Check if PID file already exists
return if -e (my $file = $self->{config}{pid_file});

# Create PID file
$self->{log}->info(qq{Creating process id file "$file".});
die qq{Can't create process id file "$file": $!}
unless open my $pid, '>', $file;
chmod 0644, $pid;
print $pid $$;
}

sub _reap {
my ($self, $pid) = @_;

# Clean up failed upgrade
if (($self->{new} || '') eq $pid) {
$self->{log}->info('Zero downtime software upgrade failed.');
delete $self->{$_} for qw(new upgrade);
}

# Clean up worker
else {
$self->{log}->debug("Worker $pid stopped.");
delete $self->{workers}{$pid};
}
}

sub _spawn {
my $self = shift;

# Manager
die "Can't fork: $!" unless defined(my $pid = fork);
return $self->{workers}{$pid} = {time => time} if $pid;

# Prepare lock file
my $c = $self->{config};
my $file = $c->{lock_file};
die qq{Can't open lock file "$file": $!} unless open my $lock, '>', $file;

# Change user/group
my $loop = $self->{daemon}->setuidgid->ioloop;

# Accept mutex
$loop->lock(
sub {

# Blocking
my $l;
if ($_[1]) {
eval {
local $SIG{ALRM} = sub { die "alarm\n" };
my $old = ualarm $c->{lock_timeout} * 1000000;
$l = flock $lock, LOCK_EX;
ualarm $old;
};
if ($@) { $l = $@ eq "alarm\n" ? 0 : die($@) }
}

# Non blocking
else { $l = flock $lock, LOCK_EX | LOCK_NB }

return $l;
}
);
$loop->unlock(sub { flock $lock, LOCK_UN });

# Heartbeat messages (stop sending during graceful stop)
weaken $self;
$loop->recurring(
$c->{heartbeat_interval} => sub {
return unless shift->max_connections;
$self->{writer}->syswrite("$$\n") or exit 0;
}
);

# Clean worker environment
$SIG{$_} = 'DEFAULT' for qw(INT TERM CHLD USR2 TTIN TTOU);
$SIG{QUIT} = sub { $loop->max_connections(0) };
delete $self->{$_} for qw(poll reader);

# Start
$self->{log}->debug("Worker $$ started.");
$loop->start;
exit 0;
return unless ($self->{new} || '') eq $pid;
$self->{prefork}->app->log->info('Zero downtime software upgrade failed.');
delete $self->{$_} for qw(new upgrade);
}

sub _stop {
Expand All @@ -351,7 +168,7 @@ Mojo::Server::Hypnotoad - ALL GLORY TO THE HYPNOTOAD!
L<Mojo::Server::Hypnotoad> is a full featured, UNIX optimized, preforking
non-blocking I/O HTTP and WebSocket server, built around the very well tested
and reliable L<Mojo::Server::Daemon>, with C<IPv6>, C<TLS>, C<Comet> (long
and reliable L<Mojo::Server::Prefork>, with C<IPv6>, C<TLS>, C<Comet> (long
polling), multiple event loop and hot deployment support that just works. Note
that the server uses signals for process management, so you should avoid
modifying signal handlers in your applications.
Expand Down

0 comments on commit cc86d1a

Please sign in to comment.