Skip to content

Commit

Permalink
Merge pull request #1141 from kraih/promises
Browse files Browse the repository at this point in the history
Promises/A+ support
  • Loading branch information
kraih committed Oct 25, 2017
2 parents 82b0f09 + 93eff35 commit bbe18c3
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 25 deletions.
22 changes: 17 additions & 5 deletions lib/Mojo/IOLoop.pm
Expand Up @@ -406,10 +406,22 @@ takes the same arguments as L<Mojo::IOLoop::Client/"connect">.
my $delay = $loop->delay(sub {...});
my $delay = $loop->delay(sub {...}, sub {...});
Build L<Mojo::IOLoop::Delay> object to manage callbacks and control the flow of
events for this event loop, which can help you avoid deep nested closures that
often result from continuation-passing style. Callbacks will be passed along to
L<Mojo::IOLoop::Delay/"steps">.
Build L<Mojo::IOLoop::Delay> object to use as a promise or for flow-control.
Callbacks will be passed along to L<Mojo::IOLoop::Delay/"steps">.
# Wrap continuation-passing style APIs with promises
my $ua = Mojo::UserAgent->new;
sub get {
my $promise = Mojo::IOLoop->delay;
$ua->get(@_ => sub {
my ($ua, $tx) = @_;
$promise->resolve($tx);
});
return $promise;
}
my $mojo = get('http://mojolicious.org');
my $cpan = get('http://metacpan.org');
$mojo->race($cpan)->then(sub { say shift->req->url })->wait;
# Synchronize multiple non-blocking operations
my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
Expand Down Expand Up @@ -455,7 +467,7 @@ L<Mojo::IOLoop::Delay/"steps">.
say 'Never actually reached.';
}
)->catch(sub {
my ($delay, $err) = @_;
my $err = shift;
say "Something went wrong: $err";
})->wait;
Expand Down
218 changes: 203 additions & 15 deletions lib/Mojo/IOLoop/Delay.pm
Expand Up @@ -3,36 +3,127 @@ use Mojo::Base 'Mojo::EventEmitter';

use Mojo::IOLoop;
use Mojo::Util;
use Scalar::Util qw(blessed weaken);

has ioloop => sub { Mojo::IOLoop->singleton };
has remaining => sub { [] };

sub all {
my @promises = @_;

my $all = $promises[0]->_clone;

my $results = [];
my $remaining = scalar @promises;
for my $i (0 .. $#promises) {
$promises[$i]->then(
sub {
$results->[$i] = [@_];
$all->resolve(@$results) if --$remaining <= 0;
},
sub { $all->reject(@_) },
);
}

return $all;
}

sub begin {
my ($self, $offset, $len) = @_;
$self->{pending}++;
my $id = $self->{counter}++;
return sub { $self->_step($id, $offset // 1, $len, @_) };
}

sub catch { shift->then(undef, shift) }

sub data { Mojo::Util::_stash(data => @_) }

sub finally {
my ($self, $finally) = @_;

my $new = $self->_clone;
my $cb = sub {
my ($method, @result) = @_;
my ($promise) = eval { $finally->(@result) };
if ($promise && blessed $promise && $promise->can('then')) {
return $promise->then(sub { $new->$method(@result) },
sub { $new->$method(@result) });
}
$new->$method(@result);
();
};

push @{$self->{resolve}}, sub { $cb->('resolve', @_) };
push @{$self->{reject}}, sub { $cb->('reject', @_) };

$self->_defer if $self->{result};

return $new;
}

sub pass { $_[0]->begin->(@_) }

sub race {
my @promises = @_;
my $race = $promises[0]->_clone;
$_->then(sub { $race->resolve(@_) }, sub { $race->reject(@_) }) for @promises;
return $race;
}

sub reject { shift->_settle('reject', @_) }
sub resolve { shift->_settle('resolve', @_) }

sub steps {
my $self = shift->remaining([@_]);
$self->ioloop->next_tick($self->begin);
return $self;
}

sub then {
my ($self, $resolve, $reject) = @_;

my $new = $self->_clone;
push @{$self->{resolve}}, $self->_wrap('resolve', $new, $resolve);
push @{$self->{reject}}, $self->_wrap('reject', $new, $reject);

$self->_defer if $self->{result};

return $new;
}

sub wait {
my $self = shift;
return if $self->ioloop->is_running;
$self->once(error => \&_die);
$self->once(finish => sub { shift->ioloop->stop });
$self->ioloop->start;
my $loop = $self->ioloop;
$self->finally(sub { $loop->stop });
$loop->start;
}

sub _clone {
my $self = shift;
my $clone = $self->new;
weaken $clone->ioloop($self->ioloop)->{ioloop};
return $clone;
}

sub _defer {
my $self = shift;

my $cbs = $self->{status} eq 'resolve' ? $self->{resolve} : $self->{reject};
@$self{qw(resolve reject)} = ([], []);
my $results = $self->{result};

$self->ioloop->next_tick(sub { $_->(@$results) for @$cbs });
}

sub _die { $_[0]->has_subscribers('error') ? $_[0]->ioloop->stop : die $_[1] }
sub _settle {
my ($self, $status) = (shift, shift);
return $self if $self->{result};
@{$self}{qw(result status)} = ([@_], $status);
$self->_defer;
return $self;
}

sub _step {
my ($self, $id, $offset, $len) = (shift, shift, shift, shift);
Expand All @@ -45,27 +136,66 @@ sub _step {

$self->{counter} = 0;
if (my $cb = shift @{$self->remaining}) {
eval { $self->$cb(@args); 1 }
or (++$self->{fail} and return $self->remaining([])->emit(error => $@));
unless (eval { $self->$cb(@args); 1 }) {
my $err = $@;
$self->{fail}++;
return $self->remaining([])->reject($err)->emit(error => $err);
}
}

return $self->remaining([])->emit(finish => @args) unless $self->{counter};
return $self->remaining([])->resolve(@args)->emit(finish => @args)
unless $self->{counter};
$self->ioloop->next_tick($self->begin) unless $self->{pending};
return $self;
}

sub _wrap {
my ($self, $method, $new, $cb) = @_;

return sub { $new->$method(@{$self->{result}}) }
unless defined $cb;

return sub {
my @result;
unless (eval { @result = $cb->(@_); 1 }) {
$new->reject($@);
}

elsif (@result == 1 and blessed $result[0] and $result[0]->can('then')) {
$result[0]
->then(sub { $new->resolve(@_); () }, sub { $new->reject(@_); () });
}

else { $new->resolve(@result) }
};
}

1;

=encoding utf8
=head1 NAME
Mojo::IOLoop::Delay - Manage callbacks and control the flow of events
Mojo::IOLoop::Delay - Promises/A+ and flow-control helpers
=head1 SYNOPSIS
use Mojo::IOLoop::Delay;
# Wrap continuation-passing style APIs with promises
my $ua = Mojo::UserAgent->new;
sub get {
my $promise = Mojo::IOLoop->delay;
$ua->get(@_ => sub {
my ($ua, $tx) = @_;
$promise->resolve($tx);
});
return $promise;
}
my $mojo = get('http://mojolicious.org');
my $cpan = get('http://metacpan.org');
$mojo->race($cpan)->then(sub { say shift->req->url })->wait;
# Synchronize multiple non-blocking operations
my $delay = Mojo::IOLoop::Delay->new;
$delay->steps(sub { say 'BOOM!' });
Expand Down Expand Up @@ -114,15 +244,16 @@ Mojo::IOLoop::Delay - Manage callbacks and control the flow of events
say 'Never actually reached.';
}
)->catch(sub {
my ($delay, $err) = @_;
my $err = shift;
say "Something went wrong: $err";
})->wait;
=head1 DESCRIPTION
L<Mojo::IOLoop::Delay> manages callbacks and controls the flow of events for
L<Mojo::IOLoop>, which can help you avoid deep nested closures that often
result from continuation-passing style.
L<Mojo::IOLoop::Delay> is a Perl-ish implementation of
L<Promises/A+|https://promisesaplus.com> and provides flow-control helpers for
L<Mojo::IOLoop>, which can help you avoid deep nested closures that often result
from continuation-passing style.
use Mojo::IOLoop;
Expand Down Expand Up @@ -228,6 +359,16 @@ Remaining L</"steps"> in chain.
L<Mojo::IOLoop::Delay> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
=head2 all
my $new = $delay->all(@delays);
Returns a new L<Mojo::IOLoop::Delay> object that either fulfills when all of the
passed L<Mojo::IOLoop::Delay> objects have fulfilled or rejects as soon as one
of them rejects. If the returned promise fulfills, it is fulfilled with the
values from the fulfilled promises in the same order as the passed promises.
This method can be useful for aggregating results of multiple promises.
=head2 begin
my $cb = $delay->begin;
Expand Down Expand Up @@ -278,6 +419,15 @@ together to the next step or L</"finish"> event.
Mojo::IOLoop->client({port => 4000} => $delay->begin);
$delay->wait;
=head2 catch
my $new = $delay->catch(sub {...});
Appends a rejection handler callback to the promise, and returns a new
L<Mojo::IOLoop::Delay> object resolving to the return value of the callback if
it is called, or to its original fulfillment value if the promise is instead
fulfilled.
=head2 data
my $hash = $delay->data;
Expand All @@ -293,6 +443,14 @@ Data shared between all L</"steps">.
# Assign multiple values at once
$delay->data(foo => 'test', bar => 23);
=head2 finally
my $new = $delay->finally(sub {...});
Appends a fulfillment and rejection handler to the promise, and returns a new
L<Mojo::IOLoop::Delay> object resolving to the original fulfillment value or
rejection reason.
=head2 pass
$delay = $delay->pass;
Expand All @@ -304,6 +462,26 @@ next step.
# Longer version
$delay->begin(0)->(@args);
=head2 race
my $new = $delay->race(@delays);
Returns a new L<Mojo::IOLoop::Delay> object that fulfills or rejects as soon as
one of the passed L<Mojo::IOLoop::Delay> objects fulfills or rejects, with the
value or reason from that promise.
=head2 reject
$delay = $delay->reject(@results);
Reject the promise.
=head2 resolve
$delay = $delay->resolve(@results);
Resolve the promise.
=head2 steps
$delay = $delay->steps(sub {...}, sub {...});
Expand All @@ -312,14 +490,24 @@ Sequentialize multiple events, every time the event counter reaches zero a
callback will run, the first one automatically runs during the next reactor tick
unless it is delayed by incrementing the event counter. This chain will continue
until there are no L</"remaining"> callbacks, a callback does not increment the
event counter or an exception gets thrown in a callback.
event counter or an exception gets thrown in a callback. Finishing the chain
will also result in the promise being fulfilled, or if an exception got thrown
it will be rejected.
=head2 then
my $new = $delay->then(sub {...}, sub {...});
Appends fulfillment and rejection handlers to the promise, and returns a new
L<Mojo::IOLoop::Delay> object resolving to the return value of the called
handler.
=head2 wait
$delay->wait;
Start L</"ioloop"> and stop it again once an L</"error"> or L</"finish"> event
gets emitted, does nothing when L</"ioloop"> is already running.
Start L</"ioloop"> and stop it again once the promise has been fulfilled or
rejected, does nothing when L</"ioloop"> is already running.
=head1 SEE ALSO
Expand Down
6 changes: 4 additions & 2 deletions lib/Mojolicious/Plugin/DefaultHelpers.pm
Expand Up @@ -81,7 +81,8 @@ sub _delay {
my $c = shift;
my $tx = $c->render_later->tx;
my $delay = Mojo::IOLoop->delay(@_);
$delay->catch(sub { $c->helpers->reply->exception(pop) and undef $tx })->wait;
$delay->on(error => sub { $c->helpers->reply->exception(pop) and undef $tx });
$delay->wait;
}

sub _development {
Expand Down Expand Up @@ -314,7 +315,8 @@ of the steps, breaking the chain.
$c->render_later;
my $tx = $c->tx;
my $delay = Mojo::IOLoop->delay(sub {...}, sub {...});
$delay->catch(sub { $c->reply->exception(pop) and undef $tx })->wait;
$delay->on(error => sub { $c->helpers->reply->exception(pop) and undef $tx });
$delay->wait;
# Non-blocking request
$c->delay(
Expand Down

0 comments on commit bbe18c3

Please sign in to comment.