Skip to content

Commit

Permalink
improved Mojo::IOLoop to be controllable from foreign event loops
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Feb 16, 2012
1 parent d42e710 commit ad6b54c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
3 changes: 2 additions & 1 deletion Changes
@@ -1,6 +1,7 @@
This file documents the revision history for Perl extension Mojolicious.

2.50 2012-02-15 00:00:00
2.50 2012-02-17 00:00:00
- Improved Mojo::IOLoop to be controllable from foreign event loops.
- Improved documentation.
- Fixed small bug in makefile command.

Expand Down
54 changes: 30 additions & 24 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -36,6 +36,9 @@ sub client {
my $cb = pop;
my $args = ref $_[0] ? $_[0] : {@_};

# Make sure garbage gets collected
$self->_cleaner;

# New client
my $client = $self->client_class->new;
my $id = $args->{id} || $self->_id;
Expand Down Expand Up @@ -121,6 +124,9 @@ sub server {
$self = $self->singleton unless ref $self;
my $cb = pop;

# Make sure garbage gets collected
$self->_cleaner;

# New server
my $server = $self->server_class->new;
my $id = $self->_id;
Expand Down Expand Up @@ -158,44 +164,24 @@ sub singleton { state $loop ||= shift->SUPER::new }
sub start {
my $self = shift;
$self = $self->singleton unless ref $self;

# Check if we are already running
croak 'Mojo::IOLoop already running' if $self->{running}++;

# Mainloop
my $id = $self->recurring(
'0.025' => sub {
my $self = shift;

# Manage connections
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {
$self->_drop($id)
if $c->{finish} && (!$c->{stream} || !$c->{stream}->is_writing);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;
}
);
$self->iowatcher->start;
$self->drop($id);

return $self;
}

sub stop {
my $self = shift;
$self = $self->singleton unless ref $self;
delete $self->{running};
$self->iowatcher->stop;
$self->iowatcher->stop if delete $self->{running};
}

sub stream {
my $self = shift;
$self = $self->singleton unless ref $self;

# Make sure garbage gets collected
$self->_cleaner;

# Find stream for id
my $stream = shift;
unless (blessed $stream) {
Expand Down Expand Up @@ -225,6 +211,26 @@ sub timer {
return $self->iowatcher->timer($after => sub { $self->$cb(pop) });
}

sub _cleaner {
my $self = shift;
$self->{cleaner} ||= $self->recurring(
'0.025' => sub {
my $self = shift;

# Manage connections
$self->_listening;
my $connections = $self->{connections} ||= {};
while (my ($id, $c) = each %$connections) {
$self->_drop($id)
if $c->{finish} && (!$c->{stream} || !$c->{stream}->is_writing);
}

# Graceful shutdown
$self->stop if $self->max_connections == 0 && keys %$connections == 0;
}
);
}

sub _drop {
my ($self, $id) = @_;

Expand Down
27 changes: 26 additions & 1 deletion t/mojo/iowatcher_ev.t
Expand Up @@ -8,7 +8,7 @@ use Test::More;
plan skip_all => 'set TEST_EV to enable this test (developer only!)'
unless $ENV{TEST_EV};
plan skip_all => 'EV 4.0 required for this test!' unless eval 'use EV 4.0; 1';
plan tests => 57;
plan tests => 60;

use IO::Socket::INET;
use Mojo::IOLoop;
Expand Down Expand Up @@ -184,3 +184,28 @@ package main;

# Detection (env)
is(Mojo::IOWatcher->detect, 'Mojo::IOWatcher::Test', 'right class');

# EV in control
undef $ENV{MOJO_IOWATCHER};
isa_ok(Mojo::IOLoop->singleton->iowatcher,
'Mojo::IOWatcher::EV', 'right object');
$port = Mojo::IOLoop->generate_port;
($server, $client) = '';
Mojo::IOLoop->server(
{port => $port} => sub {
my ($loop, $stream) = @_;
$stream->write('test', sub { shift->write('321') });
$stream->on(read => sub { $server .= pop });
}
);
Mojo::IOLoop->client(
{port => $port} => sub {
my ($loop, $err, $stream) = @_;
$stream->write('tset', sub { shift->write('123') });
$stream->on(read => sub { $client .= pop });
}
);
Mojo::IOLoop->timer(1 => sub { EV::break(EV::BREAK_ONE()) });
EV::run();
is $server, 'tset123', 'right content';
is $client, 'test321', 'right content';

0 comments on commit ad6b54c

Please sign in to comment.