Skip to content

Commit

Permalink
add EXPERIMENTAL support for performing expensive blocking operations…
Browse files Browse the repository at this point in the history
… in subprocesses
  • Loading branch information
kraih committed Aug 26, 2016
1 parent 997d21f commit 4f38a66
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 1 deletion.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -13,6 +13,7 @@ env:
- TEST_EV=1
- TEST_POD=1
- TEST_SOCKS=1
- TEST_SUBPROCESS=1
- TEST_TLS=1
install:
- cpanm -n EV IO::Socket::Socks IO::Socket::SSL Net::DNS::Native Test::Pod Test::Pod::Coverage
Expand Down
6 changes: 5 additions & 1 deletion Changes
@@ -1,5 +1,9 @@

7.04 2016-08-21
7.04 2016-08-26
- Added EXPERIMENTAL support for performing expensive blocking operations in
subprocesses. (jberger, sri)
- Added EXPERIMENTAL module Mojo::IOLoop::Subprocess. (jberger, sri)
- Added EXPERIMENTAL subprocess method to Mojo::IOLoop. (jberger, sri)
- Fixed bug where Mojo::UserAgent would try to follow redirects for CONNECT
requests.

Expand Down
29 changes: 29 additions & 0 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -8,6 +8,7 @@ use Mojo::IOLoop::Client;
use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::IOLoop::Subprocess;
use Mojo::Reactor::Poll;
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util qw(blessed weaken);
Expand Down Expand Up @@ -145,6 +146,12 @@ sub stream {
return $c->{stream};
}

sub subprocess {
my $sp = Mojo::IOLoop::Subprocess->new;
weaken $sp->ioloop(_instance(shift))->{ioloop};
return $sp->run(@_);
}

sub timer { shift->_timer(timer => @_) }

sub _id {
Expand Down Expand Up @@ -595,6 +602,28 @@ Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
# Increase inactivity timeout for connection to 300 seconds
Mojo::IOLoop->stream($id)->timeout(300);
=head2 subprocess
my $sp = Mojo::IOLoop->subprocess(sub {...}, sub {...});
my $sp = $loop->subprocess(sub {...}, sub {...});
Create a new subprocess and return a L<Mojo::IOLoop::Subprocess> object for it.
Callbacks will be passed along to L<Mojo::IOLoop::Subprocess/"run">. Note that
this method is EXPERIMENTAL and might change without warning!
# Perform an expensive blocking operation in a subprocess
Mojo::IOLoop->subprocess(
sub {
my $sp = shift;
sleep 5;
return 1 + 1, 2 + 2;
},
sub {
my ($sp, $err, @results) = @_;
say "The results are $results[0] and $results[1]";
}
);
=head2 timer
my $id = Mojo::IOLoop->timer(3 => sub {...});
Expand Down
144 changes: 144 additions & 0 deletions lib/Mojo/IOLoop/Subprocess.pm
@@ -0,0 +1,144 @@
package Mojo::IOLoop::Subprocess;
use Mojo::Base -base;

use Config;
use Mojo::IOLoop;
use Mojo::IOLoop::Stream;
use Storable;

has deserialize => sub { \&Storable::thaw };
has ioloop => sub { Mojo::IOLoop->singleton };
has serialize => sub { \&Storable::freeze };

sub pid { shift->{pid} }

sub run {
my ($self, $first, $second) = @_;

# No fork emulation support
say 'Subprocesses do not support fork emulation.' and exit 0
if $Config{d_pseudofork};

# Pipe for subprocess communication
pipe(my $reader, my $writer) or die "Can't create pipe: $!";

# Child
die "Can't fork: $!" unless defined($self->{pid} = fork);
unless ($self->{pid}) {
$self->ioloop->reset;
print $writer $self->serialize->([$self->$first]);
exit 0;
}

# Parent
my $stream = Mojo::IOLoop::Stream->new($reader);
$self->ioloop->stream($stream);
my $buffer;
$stream->on(read => sub { $buffer .= pop });
$stream->on(
close => sub {
waitpid $self->{pid}, 0;
return $self->$second("Non-zero exit status (@{[$? >> 8]})") if $?;
my $result = eval { $self->deserialize->($buffer) } || [];
$self->$second($@, @$result);
}
);
return $self;
}

1;

=encoding utf8
=head1 NAME
Mojo::IOLoop::Subprocess - Subprocesses
=head1 SYNOPSIS
use Mojo::IOLoop::Subprocess;
# Perform an expensive blocking operation in a subprocess
my $sp = Mojo::IOLoop::Subprocess->new;
$sp->run(
sub {
my $sp = shift;
sleep 5;
return 1 + 1, 2 + 2;
},
sub {
my ($sp, $err, @results) = @_;
say "The results are $results[0] and $results[1]";
}
);
# Start event loop if necessary
$sp->ioloop->start unless $sp->ioloop->is_running;
=head1 DESCRIPTION
L<Mojo::IOLoop::Subprocess> allows L<Mojo::IOLoop> to process expensive blocking
operations in subprocesses. Note that this module is EXPERIMENTAL and might
change without warning!
=head1 ATTRIBUTES
L<Mojo::IOLoop::Subprocess> implements the following attributes.
=head2 deserialize
my $cb = $sp->deserialize;
$sp = $sp->deserialize(sub {...});
A callback used to deserialize subprocess return values, defaults to using
L<Storable>.
$sessions->deserialize(sub {
my $bytes = shift;
return {};
});
=head2 ioloop
my $loop = $sp->ioloop;
$sp = $sp->ioloop(Mojo::IOLoop->new);
Event loop object to control, defaults to the global L<Mojo::IOLoop> singleton.
=head2 serialize
my $cb = $sp->serialize;
$sp = $sp->serialize(sub {...});
A callback used to serialize subprocess return values, defaults to using
L<Storable>.
$sessions->serialize(sub {
my $array = shift;
return '';
});
=head1 METHODS
L<Mojo::IOLoop::Subprocess> inherits all methods from L<Mojo::Base> and
implements the following new ones.
=head2 pid
my $pid = $sp->pid;
Process id of the spawned subprocess if available.
=head2 run
$sp = $sp->run(sub {...}, sub {...});
Execute the first callback in a child process and wait for it to return one or
more values, before executing the second callback in the parent process with the
results.
=head1 SEE ALSO
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicious.org>.
=cut
2 changes: 2 additions & 0 deletions lib/Mojolicious/Guides.pod
Expand Up @@ -323,6 +323,8 @@ This is the class hierarchy of the L<Mojolicious> distribution.

=item * L<Mojo::Home>

=item * L<Mojo::IOLoop::Subprocess>

=item * L<Mojo::JSON::Pointer>

=item * L<Mojo::Parameters>
Expand Down
106 changes: 106 additions & 0 deletions t/mojo/subprocess.t
@@ -0,0 +1,106 @@
use Mojo::Base -strict;

BEGIN { $ENV{MOJO_REACTOR} = 'Mojo::Reactor::Poll' }

use Test::More;

plan skip_all => 'set TEST_SUBPROCESS to enable this test (developer only!)'
unless $ENV{TEST_SUBPROCESS};

use Mojo::IOLoop;
use Mojo::IOLoop::Subprocess;

# Huge result
my ($fail, $result);
my $sp = Mojo::IOLoop::Subprocess->new;
$sp->run(
sub { shift->pid . $$ . ('x' x 100000) },
sub {
my ($sp, $err, $two) = @_;
$fail = $err;
$result = $two;
}
);
Mojo::IOLoop->start;
ok !$fail, 'no error';
is $result, 0 . $sp->pid . ('x' x 100000), 'right result';

# Multiple return values
($fail, $result) = ();
$sp = Mojo::IOLoop::Subprocess->new;
$sp->run(
sub { return 1, [{two => 2}], 3 },
sub {
my ($sp, $err, @results) = @_;
$fail = $err;
$result = \@results;
}
);
Mojo::IOLoop->start;
ok !$fail, 'no error';
is_deeply $result, [1, [{two => 2}], 3], 'right structure';

# Event loop in subprocess
($fail, $result) = ();
$sp = Mojo::IOLoop::Subprocess->new;
$sp->run(
sub {
my $result;
Mojo::IOLoop->next_tick(sub { $result = 23 });
Mojo::IOLoop->start;
return $result;
},
sub {
my ($sp, $err, $twenty_three) = @_;
$fail = $err;
$result = $twenty_three;
}
);
Mojo::IOLoop->start;
ok !$fail, 'no error';
is $result, 23, 'right result';

# Concurrent subprocesses
($fail, $result) = ();
Mojo::IOLoop->delay(
sub {
my $delay = shift;
Mojo::IOLoop->subprocess(sub {1}, $delay->begin);
Mojo::IOLoop->subprocess(sub {2}, $delay->begin);
},
sub {
my ($delay, $err1, $result1, $err2, $result2) = @_;
$fail = $err1 || $err2;
$result = [$result1, $result2];
}
)->wait;
ok !$fail, 'no error';
is_deeply $result, [1, 2], 'right structure';

# Non-zero exit status
$fail = undef;
Mojo::IOLoop::Subprocess->new->run(
sub { exit 3 },
sub {
my ($sp, $err) = @_;
$fail = $err;
}
);
Mojo::IOLoop->start;
is $fail, 'Non-zero exit status (3)', 'right error';

# Serialization error
$fail = undef;
$sp = Mojo::IOLoop::Subprocess->new;
$sp->deserialize(sub { die 'Whatever' });
$sp->run(
sub { 1 + 1 },
sub {
my ($sp, $err) = @_;
$fail = $err;
}
);
Mojo::IOLoop->start;
like $fail, qr/Whatever/, 'right error';

done_testing();

0 comments on commit 4f38a66

Please sign in to comment.