Skip to content

Commit

Permalink
emit all events safely
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Sep 10, 2014
1 parent d2bbee2 commit 05cf42a
Show file tree
Hide file tree
Showing 19 changed files with 80 additions and 75 deletions.
4 changes: 2 additions & 2 deletions lib/Mojo/Asset/Memory.pm
Expand Up @@ -15,7 +15,7 @@ sub add_chunk {
return $self
if !$self->auto_upgrade || $self->size <= $self->max_memory_size;
my $file = Mojo::Asset::File->new;
return $file->add_chunk($self->emit(upgrade => $file)->slurp);
return $file->add_chunk($self->emit_safe(upgrade => $file)->slurp);
}

sub contains {
Expand Down Expand Up @@ -85,7 +85,7 @@ the following new ones.
...
});
Emitted when asset gets upgraded to a L<Mojo::Asset::File> object.
Emitted safely when asset gets upgraded to a L<Mojo::Asset::File> object.
$mem->on(upgrade => sub {
my ($mem, $file) = @_;
Expand Down
16 changes: 8 additions & 8 deletions lib/Mojo/Content.pm
Expand Up @@ -40,7 +40,7 @@ sub clone {
sub generate_body_chunk {
my ($self, $offset) = @_;

$self->emit(drain => $offset)
$self->emit_safe(drain => $offset)
if !delete $self->{delay} && !length($self->{body_buffer} // '');
my $chunk = delete $self->{body_buffer} // '';
return $self->{eof} ? '' : undef unless length $chunk;
Expand Down Expand Up @@ -88,7 +88,7 @@ sub parse {
# Headers
$self->_parse_until_body(@_);
return $self if $self->{state} eq 'headers';
$self->emit('body') unless $self->{body}++;
$self->emit_safe('body') unless $self->{body}++;

# Chunked content
$self->{real_size} //= 0;
Expand Down Expand Up @@ -264,7 +264,7 @@ sub _parse_headers {
# Take care of leftovers
my $leftovers = $self->{pre_buffer} = $headers->leftovers;
$self->{header_size} = $self->{raw_size} - length $leftovers;
$self->emit('body') unless $self->{body}++;
$self->emit_safe('body') unless $self->{body}++;
}

sub _parse_until_body {
Expand All @@ -284,15 +284,15 @@ sub _uncompress {
my ($self, $chunk) = @_;

# No compression
return $self->emit(read => $chunk)
return $self->emit_safe(read => $chunk)
unless $self->is_compressed && $self->auto_relax;

# Uncompress
$self->{post_buffer} .= $chunk;
my $gz = $self->{gz}
//= Compress::Raw::Zlib::Inflate->new(WindowBits => WANT_GZIP);
my $status = $gz->inflate(\$self->{post_buffer}, my $out);
$self->emit(read => $out) if defined $out;
$self->emit_safe(read => $out) if defined $out;
# Replace Content-Encoding with Content-Length
$self->headers->content_length($gz->total_out)->remove('Content-Encoding')
Expand Down Expand Up @@ -338,7 +338,7 @@ the following new ones.
...
});
Emitted once all headers have been parsed and the body starts.
Emitted safely once all headers have been parsed and the body starts.
$content->on(body => sub {
my $content = shift;
Expand All @@ -352,7 +352,7 @@ Emitted once all headers have been parsed and the body starts.
...
});
Emitted once all data has been written.
Emitted safely once all data has been written.
$content->on(drain => sub {
my $content = shift;
Expand All @@ -366,7 +366,7 @@ Emitted once all data has been written.
...
});
Emitted when a new chunk of content arrives.
Emitted safely when a new chunk of content arrives.
$content->unsubscribe('read');
$content->on(read => sub {
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Content/MultiPart.pm
Expand Up @@ -141,7 +141,7 @@ sub _parse_multipart_boundary {

# New part
my $part = Mojo::Content::Single->new(relaxed => 1);
$self->emit(part => $part);
$self->emit_safe(part => $part);
push @{$self->parts}, $part;
return !!($self->{multi_state} = 'multipart_body');
}
Expand Down Expand Up @@ -232,7 +232,7 @@ emit the following new ones.
...
});
Emitted when a new L<Mojo::Content::Single> part starts.
Emitted safely when a new L<Mojo::Content::Single> part starts.
$multi->on(part => sub {
my ($multi, $single) = @_;
Expand Down
5 changes: 3 additions & 2 deletions lib/Mojo/Content/Single.pm
Expand Up @@ -47,7 +47,7 @@ sub parse {
# Content needs to be upgraded to multipart
$self->unsubscribe(read => $self->{read});
my $multi = Mojo::Content::MultiPart->new(%$self);
$self->emit(upgrade => $multi);
$self->emit_safe(upgrade => $multi);
return $multi->parse;
}

Expand Down Expand Up @@ -85,7 +85,8 @@ emit the following new ones.
...
});
Emitted when content gets upgraded to a L<Mojo::Content::MultiPart> object.
Emitted safely when content gets upgraded to a L<Mojo::Content::MultiPart>
object.
$single->on(upgrade => sub {
my ($single, $multi) = @_;
Expand Down
7 changes: 4 additions & 3 deletions lib/Mojo/IOLoop/Delay.pm
Expand Up @@ -58,7 +58,8 @@ sub _step {
or (++$self->{fail} and return $self->remaining([])->emit(error => $@));
}

return $self->remaining([])->emit(finish => @args) unless $self->{counter};
return $self->remaining([])->emit_safe(finish => @args)
unless $self->{counter};
$self->ioloop->next_tick($self->begin) unless $self->{pending};
return $self;
}
Expand Down Expand Up @@ -155,8 +156,8 @@ unhandled.
...
});
Emitted once the active event counter reaches zero and there are no more
steps.
Emitted safely once the active event counter reaches zero and there are no
more steps.
=head1 ATTRIBUTES
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Log.pm
Expand Up @@ -48,7 +48,7 @@ sub is_level {

sub is_warn { shift->is_level('warn') }

sub log { shift->emit('message', lc shift, @_) }
sub log { shift->emit_safe('message', lc shift, @_) }

sub new {
my $self = shift->SUPER::new(@_);
Expand Down Expand Up @@ -116,7 +116,7 @@ following new ones.
...
});
Emitted when a new message gets logged.
Emitted safely when a new message gets logged.
$log->unsubscribe('message');
$log->on(message => sub {
Expand Down
13 changes: 7 additions & 6 deletions lib/Mojo/Message.pm
Expand Up @@ -86,7 +86,7 @@ sub extract_start_line {
sub finish {
my $self = shift;
$self->{state} = 'finished';
return $self->{finished}++ ? $self : $self->emit('finish');
return $self->{finished}++ ? $self : $self->emit_safe('finish');
}

sub fix_headers {
Expand All @@ -106,7 +106,7 @@ sub fix_headers {
sub get_body_chunk {
my ($self, $offset) = @_;

$self->emit('progress', 'body', $offset);
$self->emit_safe('progress', 'body', $offset);
my $chunk = $self->content->get_body_chunk($offset);
return $chunk if !defined $chunk || length $chunk;
$self->finish;
Expand All @@ -116,7 +116,7 @@ sub get_body_chunk {

sub get_header_chunk {
my ($self, $offset) = @_;
$self->emit('progress', 'headers', $offset);
$self->emit_safe('progress', 'headers', $offset);
return $self->fix_headers->content->get_header_chunk($offset);
}

Expand Down Expand Up @@ -177,7 +177,8 @@ sub parse {
{message => 'Maximum buffer size exceeded', advice => 400})
if $self->content->is_limit_exceeded;

return $self->emit('progress')->content->is_finished ? $self->finish : $self;
return $self unless $self->emit_safe('progress')->content->is_finished;
return $self->finish;
}

sub start_line_size { length shift->build_start_line }
Expand Down Expand Up @@ -327,7 +328,7 @@ the following new ones.
...
});
Emitted after message building or parsing is finished.
Emitted safely after message building or parsing is finished.
my $before = time;
$msg->on(finish => sub {
Expand All @@ -342,7 +343,7 @@ Emitted after message building or parsing is finished.
...
});
Emitted when message building or parsing makes progress.
Emitted safely when message building or parsing makes progress.
# Building
$msg->on(progress => sub {
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Message/Request.pm
Expand Up @@ -111,7 +111,7 @@ sub get_start_line_chunk {
$self->{start_buffer} = "$method $path HTTP/@{[$self->version]}\x0d\x0a";
}

$self->emit(progress => 'start_line', $offset);
$self->emit_safe(progress => 'start_line', $offset);
return substr $self->{start_buffer}, $offset, 131072;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Message/Response.pm
Expand Up @@ -124,7 +124,7 @@ sub get_start_line_chunk {
$self->{start_buffer} = "HTTP/@{[$self->version]} $code $msg\x0d\x0a";
}

$self->emit(progress => 'start_line', $offset);
$self->emit_safe(progress => 'start_line', $offset);
return substr $self->{start_buffer}, $offset, 131072;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server.pm
Expand Up @@ -132,7 +132,7 @@ the following new ones.
...
});
Emitted when a request is ready and needs to be handled.
Emitted safely when a request is ready and needs to be handled.
$server->unsubscribe('request');
$server->on(request => sub {
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server/CGI.pm
Expand Up @@ -21,7 +21,7 @@ sub run {
}

# Handle request
$self->emit(request => $tx);
$self->emit_safe(request => $tx);

# Response start line
STDOUT->autoflush(1);
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server/Daemon.pm
Expand Up @@ -80,7 +80,7 @@ sub _build_tx {
$tx->on(
request => sub {
my $tx = shift;
$self->emit(request => $self->{connections}{$id}{ws} || $tx);
$self->emit_safe(request => $self->{connections}{$id}{ws} || $tx);
$tx->on(resume => sub { $self->_write($id) });
}
);
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Server/PSGI.pm
Expand Up @@ -18,7 +18,7 @@ sub run {
}

# Handle request
$self->emit(request => $tx);
$self->emit_safe(request => $tx);

# Response headers
my $res = $tx->res->fix_headers;
Expand Down
20 changes: 10 additions & 10 deletions lib/Mojo/Server/Prefork.pm
Expand Up @@ -79,7 +79,7 @@ sub run {
local $SIG{CHLD} = sub {
while ((my $pid = waitpid -1, WNOHANG) > 0) {
$self->app->log->debug("Worker $pid stopped.")
if delete $self->emit(reap => $pid)->{pool}{$pid};
if delete $self->emit_safe(reap => $pid)->{pool}{$pid};
}
};
local $SIG{QUIT} = sub { $self->_term(1) };
Expand Down Expand Up @@ -109,7 +109,7 @@ sub _heartbeat {
my $time = steady_time;
while ($chunk =~ /(\d+):(\d)\n/g) {
next unless my $w = $self->{pool}{$1};
$self->emit(heartbeat => $1) and $w->{time} = $time;
$self->emit_safe(heartbeat => $1) and $w->{time} = $time;
$w->{graceful} ||= $time if $2;
}
}
Expand All @@ -127,7 +127,7 @@ sub _manage {
elsif (!keys %{$self->{pool}}) { return delete $self->{running} }

# Wait for heartbeats
$self->emit('wait')->_heartbeat;
$self->emit_safe('wait')->_heartbeat;

my $interval = $self->heartbeat_interval;
my $ht = $self->heartbeat_timeout;
Expand Down Expand Up @@ -161,7 +161,7 @@ sub _spawn {

# Manager
die "Can't fork: $!" unless defined(my $pid = fork);
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time}
return $self->emit_safe(spawn => $pid)->{pool}{$pid} = {time => steady_time}
if $pid;

# Prepare lock file
Expand Down Expand Up @@ -217,7 +217,7 @@ sub _spawn {

sub _term {
my ($self, $graceful) = @_;
$self->emit(finish => $graceful)->{finished} = 1;
$self->emit_safe(finish => $graceful)->{finished} = 1;
$self->{graceful} = 1 if $graceful;
}

Expand Down Expand Up @@ -316,7 +316,7 @@ can emit the following new ones.
...
});
Emitted when the server shuts down.
Emitted safely when the server shuts down.
$prefork->on(finish => sub {
my ($prefork, $graceful) = @_;
Expand All @@ -330,7 +330,7 @@ Emitted when the server shuts down.
...
});
Emitted when a heartbeat message has been received from a worker.
Emitted safely when a heartbeat message has been received from a worker.
$prefork->on(heartbeat => sub {
my ($prefork, $pid) = @_;
Expand All @@ -344,7 +344,7 @@ Emitted when a heartbeat message has been received from a worker.
...
});
Emitted when a child process dies.
Emitted safely when a child process dies.
$prefork->on(reap => sub {
my ($prefork, $pid) = @_;
Expand All @@ -358,7 +358,7 @@ Emitted when a child process dies.
...
});
Emitted when a worker process is spawned.
Emitted safely when a worker process is spawned.
$prefork->on(spawn => sub {
my ($prefork, $pid) = @_;
Expand All @@ -372,7 +372,7 @@ Emitted when a worker process is spawned.
...
});
Emitted when the manager starts waiting for new heartbeat messages.
Emitted safely when the manager starts waiting for new heartbeat messages.
$prefork->on(wait => sub {
my $prefork = shift;
Expand Down

0 comments on commit 05cf42a

Please sign in to comment.