Skip to content

Commit

Permalink
accepted connections need to be handled separately
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Mar 2, 2016
1 parent e64bcc2 commit 912bfac
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 25 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,5 +1,6 @@

6.52 2016-03-02
- Fixed a few concurrency bugs in Mojo::IOLoop.

6.51 2016-02-29
- Fixed bug in Mojolicious::Plugin::EPLRenderer where empty templates from the
Expand Down
44 changes: 23 additions & 21 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -50,13 +50,13 @@ sub client {
my ($self, $cb) = (_instance(shift), pop);

my $id = $self->_id;
my $client = $self->{connections}{$id}{client} = Mojo::IOLoop::Client->new;
my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new;
weaken $client->reactor($self->reactor)->{reactor};

weaken $self;
$client->on(
connect => sub {
delete $self->{connections}{$id}{client};
delete $self->{out}{$id}{client};
my $stream = Mojo::IOLoop::Stream->new(pop);
$self->_stream($stream => $id);
$self->$cb(undef, $stream);
Expand Down Expand Up @@ -88,14 +88,14 @@ sub recurring { shift->_timer(recurring => @_) }

sub remove {
my ($self, $id) = (_instance(shift), @_);
my $c = $self->{connections}{$id};
my $c = $self->{in}{$id} || $self->{out}{$id};
if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
$self->_remove($id);
}

sub reset {
my $self = _instance(shift);
delete @$self{qw(accepting acceptors connections stop)};
delete @$self{qw(accepting acceptors in out stop)};
$self->reactor->reset;
$self->stop;
}
Expand All @@ -115,7 +115,7 @@ sub server {
}

my $stream = Mojo::IOLoop::Stream->new(pop);
$self->$cb($stream, $self->stream($stream));
$self->$cb($stream, $self->_stream($stream, $self->_id, 1));

# Stop accepting if connection limit has been reached
$self->_not_accepting if $self->_limit;
Expand Down Expand Up @@ -143,8 +143,9 @@ sub stop_gracefully {

sub stream {
my ($self, $stream) = (_instance(shift), @_);
return ($self->{connections}{$stream} || {})->{stream} unless ref $stream;
return $self->_stream($stream => $self->_id);
return $self->_stream($stream => $self->_id) if ref $stream;
my $c = $self->{in}{$stream} || $self->{out}{$stream} || {};
return $c->{stream};
}

sub timer { shift->_timer(timer => @_) }
Expand All @@ -153,17 +154,15 @@ sub _id {
my $self = shift;
my $id;
do { $id = md5_sum 'c' . steady_time . rand 999 }
while $self->{connections}{$id} || $self->{acceptors}{$id};
while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
return $id;
}

sub _in { scalar keys %{shift->{in} || {}} }

sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }

sub _limit {
my $self = shift;
return 1 if $self->{stop};
return keys %{$self->{connections}} >= $self->max_connections;
}
sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }

sub _maybe_accepting {
my $self = shift;
Expand All @@ -179,6 +178,8 @@ sub _not_accepting {
return $self;
}

sub _out { scalar keys %{shift->{out} || {}} }

sub _remove {
my ($self, $id) = @_;

Expand All @@ -191,24 +192,24 @@ sub _remove {
if delete $self->{acceptors}{$id};

# Connection
return unless delete $self->{connections}{$id};
return unless delete $self->{in}{$id} || delete $self->{out}{$id};
$self->_maybe_accepting;
warn "-- $id <<< $$ (@{[scalar keys %{$self->{connections}}]})\n" if DEBUG;
warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
}

sub _stop {
my $self = shift;
return if keys %{$self->{connections}};
return if $self->_out || $self->_in;
$self->_remove(delete $self->{stop});
$self->stop;
}

sub _stream {
my ($self, $stream, $id) = @_;
my ($self, $stream, $id, $server) = @_;

# Connect stream with reactor
$self->{connections}{$id}{stream} = $stream;
warn "-- $id >>> $$ (@{[scalar keys %{$self->{connections}}]})\n" if DEBUG;
$self->{$server ? 'in' : 'out'}{$id}{stream} = $stream;
warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
weaken $stream->reactor($self->reactor)->{reactor};
weaken $self;
$stream->on(close => sub { $self && $self->_remove($id) });
Expand Down Expand Up @@ -341,8 +342,9 @@ randomly to improve load balancing between multiple server processes.
my $max = $loop->max_connections;
$loop = $loop->max_connections(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to C<1000>.
The maximum number of accepted connections this event loop is allowed to handle
concurrently, before stopping to accept new incoming connections, defaults to
C<1000>.
=head2 multi_accept
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -427,8 +427,8 @@ TLS protocol version.
my $max = $daemon->max_clients;
$daemon = $daemon->max_clients(1000);
Maximum number of concurrent connections this server is allowed to handle
before stopping to accept new incoming connections, passed along to
Maximum number of accepted connections this server is allowed to handle
concurrently before stopping to accept new incoming connections, passed along to
L<Mojo::IOLoop/"max_connections">.
=head2 max_requests
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/Server/Hypnotoad.pm
Expand Up @@ -256,8 +256,8 @@ L<Mojo::Server::Daemon/"backlog">.
clients => 100
Maximum number of concurrent connections each worker process is allowed to
handle before stopping to accept new incoming connections, defaults to the
Maximum number of accepted connections each worker process is allowed to handle
concurrently before stopping to accept new incoming connections, defaults to the
value of L<Mojo::IOLoop/"max_connections">. Note that high concurrency works
best with applications that perform mostly non-blocking operations, to optimize
for blocking operations you can decrease this value and increase L</"workers">
Expand Down

0 comments on commit 912bfac

Please sign in to comment.