Skip to content

Commit

Permalink
start with a clean implementation of Promises/A+
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Oct 24, 2017
1 parent 24d4fd2 commit 8de82b9
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 90 deletions.
109 changes: 80 additions & 29 deletions lib/Mojo/IOLoop/Delay.pm
Expand Up @@ -3,11 +3,31 @@ use Mojo::Base 'Mojo::EventEmitter';

use Mojo::IOLoop;
use Mojo::Util;
use Scalar::Util 'weaken';
use Scalar::Util qw(blessed weaken);

has ioloop => sub { Mojo::IOLoop->singleton };
has remaining => sub { [] };

sub all {
my @promises = @_;

my $all = $promises[0]->_clone;

my $results = [];
my $remaining = scalar @promises;
for my $i (0 .. $#promises) {
$promises[$i]->then(
sub {
$results->[$i] = [@_];
$all->resolve(@$results) if --$remaining <= 0;
},
sub { $all->reject(@_) },
);
}

return $all;
}

sub begin {
my ($self, $offset, $len) = @_;
$self->{pending}++;
Expand All @@ -22,14 +42,14 @@ sub data { Mojo::Util::_stash(data => @_) }
sub pass { $_[0]->begin->(@_) }

sub race {
my $self = shift;
my $next = $self->_clone;
$_->then(sub { $next->resolve(@_) }, sub { $next->reject(@_) }) for $self, @_;
return $next;
my @promises = @_;
my $race = $promises[0]->_clone;
$_->then(sub { $race->resolve(@_) }, sub { $race->reject(@_) }) for @promises;
return $race;
}

sub reject { shift->_finish(error => @_) }
sub resolve { shift->_finish(finish => @_) }
sub reject { shift->_settle('reject', @_) }
sub resolve { shift->_settle('resolve', @_) }

sub steps {
my $self = shift->remaining([@_]);
Expand All @@ -38,20 +58,15 @@ sub steps {
}

sub then {
my ($self, $finish, $error) = @_;
my ($self, $resolve, $reject) = @_;

my $next = $self->_clone;
$self->on(finish => sub { shift; $next->resolve(@_) });
$self->on(error => sub { shift; $next->reject(@_) });
$next->on(finish => sub { shift; $finish->(@_) }) if $finish;
$next->on(error => sub { shift; $error->(@_) }) if $error;
my $new = $self->_clone;
push @{$self->{resolve}}, $self->_wrap('resolve', $new, $resolve);
push @{$self->{reject}}, $self->_wrap('reject', $new, $reject);

return $next unless $self->{settled};
$self->_defer if $self->{result};

my $method = $self->{error} ? 'reject' : 'resolve';
my $args = $self->{error} || $self->{finish};
$next->ioloop->next_tick(sub { $next->$method(@$args) });
return $next;
return $new;
}

sub wait {
Expand All @@ -69,34 +84,66 @@ sub _clone {
return $clone;
}

sub _defer {
my $self = shift;

my $cbs = $self->{status} eq 'resolve' ? $self->{resolve} : $self->{reject};
@$self{qw(resolve reject)} = ([], []);
my $results = $self->{result};

$self->ioloop->next_tick(sub { $_->(@$results) for @$cbs });
}

sub _die { $_[0]->has_subscribers('error') ? $_[0]->ioloop->stop : die $_[1] }

sub _finish {
my ($self, $event) = (shift, shift);
$self->{settled} ? return $self : $self->{settled}++;
$self->{$event} = [@_];
return $self->remaining([])->emit($event => @_);
sub _settle {
my ($self, $status) = (shift, shift);
@{$self}{qw(result status)} = ([@_], $status);
$self->_defer;
return $self;
}

sub _step {
my ($self, $id, $offset, $len) = (shift, shift, shift, shift);

$self->{args}[$id]
= [@_ ? defined $len ? splice @_, $offset, $len : splice @_, $offset : ()];
return $self if $self->{settled} || --$self->{pending} || $self->{lock};
return $self if $self->{fail} || --$self->{pending} || $self->{lock};
local $self->{lock} = 1;
my @args = map {@$_} @{delete $self->{args}};

$self->{counter} = 0;
if (my $cb = shift @{$self->remaining}) {
eval { $self->$cb(@args); 1 } or $self->reject($@);
eval { $self->$cb(@args); 1 }
or (++$self->{fail} and return $self->remaining([])->emit(error => $@));
}

return $self->resolve(@args) unless $self->{counter};
return $self->remaining([])->emit(finish => @args) unless $self->{counter};
$self->ioloop->next_tick($self->begin) unless $self->{pending};
return $self;
}

sub _wrap {
my ($self, $method, $new, $cb) = @_;

return sub { $new->$method(@{$self->{result}}) }
unless defined $cb;

return sub {
my @result;
unless (eval { @result = $cb->(@_); 1 }) {
$new->reject($@);
}

elsif (@result == 1 and blessed $result[0] and $result[0]->can('then')) {
$result[0]
->then(sub { $new->resolve(@_); () }, sub { $new->reject(@_); () });
}

else { $new->resolve(@result) }
};
}

1;

=encoding utf8
Expand Down Expand Up @@ -272,6 +319,10 @@ Remaining L</"steps"> in chain.
L<Mojo::IOLoop::Delay> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
=head2 all
my $new = $delay->all(@delays);
=head2 begin
my $cb = $delay->begin;
Expand Down Expand Up @@ -324,7 +375,7 @@ together to the next step or L</"finish"> event.
=head2 catch
my $thenable = $delay->catch(sub {...});
my $new = $delay->catch(sub {...});
=head2 data
Expand Down Expand Up @@ -354,7 +405,7 @@ next step.
=head2 race
my $thenable = $delay->race(@thenables);
my $new = $delay->race(@delays);
=head2 reject
Expand All @@ -376,7 +427,7 @@ event counter or an exception gets thrown in a callback.
=head2 then
my $thenable = $delay->then(sub {...}, sub {...});
my $new = $delay->then(sub {...}, sub {...});
=head2 wait
Expand Down
7 changes: 4 additions & 3 deletions lib/Mojolicious/Plugin/DefaultHelpers.pm
Expand Up @@ -81,8 +81,8 @@ sub _delay {
my $c = shift;
my $tx = $c->render_later->tx;
my $delay = Mojo::IOLoop->delay(@_);
$delay->catch(sub { $c->helpers->reply->exception(shift) and undef $tx })
->wait;
$delay->on(error => sub { $c->helpers->reply->exception(pop) and undef $tx });
$delay->wait;
}

sub _development {
Expand Down Expand Up @@ -315,7 +315,8 @@ of the steps, breaking the chain.
$c->render_later;
my $tx = $c->tx;
my $delay = Mojo::IOLoop->delay(sub {...}, sub {...});
$delay->catch(sub { $c->reply->exception(shift) and undef $tx })->wait;
$delay->on(error => sub { $c->helpers->reply->exception(pop) and undef $tx });
$delay->wait;
# Non-blocking request
$c->delay(
Expand Down
136 changes: 78 additions & 58 deletions t/mojo/delay.t
Expand Up @@ -6,9 +6,82 @@ use Test::More;
use Mojo::IOLoop;
use Mojo::IOLoop::Delay;

# Basic functionality
# Promise (resolved)
my $delay = Mojo::IOLoop::Delay->new;
my @results;
my (@results, @errors);
$delay->then(sub { @results = @_ }, sub { @errors = @_ });
$delay->resolve('hello', 'world');
Mojo::IOLoop->one_tick;
is_deeply \@results, ['hello', 'world'], 'promise resolved';
is_deeply \@errors, [], 'promise not rejected';

# Promise (rejected)
$delay = Mojo::IOLoop::Delay->new;
(@results, @errors) = ();
$delay->then(sub { @results = @_ }, sub { @errors = @_ });
$delay->reject('bye', 'world');
Mojo::IOLoop->one_tick;
is_deeply \@results, [], 'promise not resolved';
is_deeply \@errors, ['bye', 'world'], 'promise rejected';

# Promise (chained)
$delay = Mojo::IOLoop::Delay->new;
@results = ();
$delay->then(sub {"$_[0]:1"})->then(sub {"$_[0]:2"})->then(sub {"$_[0]:3"})
->then(sub { push @results, "$_[0]:4" });
$delay->resolve('test');
Mojo::IOLoop->one_tick;
is_deeply \@results, ['test:1:2:3:4'], 'promises resolved';

# Promise (resolved nested)
$delay = Mojo::IOLoop::Delay->new;
my $delay2 = Mojo::IOLoop::Delay->new;
@results = ();
$delay->then(sub {$delay2})->then(sub { @results = @_ });
$delay->resolve;
Mojo::IOLoop->one_tick;
is_deeply \@results, [], 'promise not resolved';
$delay2->resolve('works too');
Mojo::IOLoop->one_tick;
is_deeply \@results, ['works too'], 'promise resolved';

# Promise (exception in chain)
$delay = Mojo::IOLoop::Delay->new;
(@results, @errors) = ();
$delay->then(sub {@_})->then(sub {@_})->then(sub { die "test: $_[0]\n" })
->then(sub { push @results, 'fail' })->catch(sub { @errors = @_ });
$delay->resolve('works');
Mojo::IOLoop->one_tick;
is_deeply \@results, [], 'promises not resolved';
is_deeply \@errors, ["test: works\n"], 'promises rejected';

# Promise (race)
$delay = Mojo::IOLoop::Delay->new->then(sub {@_});
$delay2 = Mojo::IOLoop::Delay->new->then(sub {@_});
my $delay3 = Mojo::IOLoop::Delay->new->then(sub {@_});
@results = ();
$delay->race($delay2, $delay3)->then(sub { @results = @_ });
$delay2->resolve('second');
$delay3->resolve('third');
$delay->resolve('first');
Mojo::IOLoop->one_tick;
is_deeply \@results, ['second'], 'promises resolved';

# Promise (all)
$delay = Mojo::IOLoop::Delay->new->then(sub {@_});
$delay2 = Mojo::IOLoop::Delay->new->then(sub {@_});
$delay3 = Mojo::IOLoop::Delay->new->then(sub {@_});
@results = ();
$delay->all($delay2, $delay3)->then(sub { @results = @_ });
$delay2->resolve('second');
$delay3->resolve('third');
$delay->resolve('first');
Mojo::IOLoop->one_tick;
is_deeply \@results, [['first'], ['second'], ['third']], 'promises resolved';

# Basic functionality
$delay = Mojo::IOLoop::Delay->new;
@results = ();
for my $i (1, 1) {
my $end = $delay->begin;
Mojo::IOLoop->next_tick(sub { push @results, $i; $end->() });
Expand All @@ -20,61 +93,6 @@ $end2->();
$delay->wait;
is_deeply \@results, [1, 1], 'right results';

# Thenable
my ($resolve, $reject, $resolve2, $reject2);
$delay = Mojo::IOLoop::Delay->new;
my $delay2 = $delay->then(sub { $resolve = shift }, sub { $reject = shift });
$delay2->then(sub { $resolve2 = shift }, sub { $reject2 = shift });
$delay->resolve('works');
is $resolve, 'works', 'promise was resolved';
is $resolve2, 'works', 'promise was resolved';
is $reject, undef, 'promise was not rejected';
is $reject2, undef, 'promise was not rejected';

# Thenable (race)
($resolve, $reject) = ();
my $promise = Mojo::IOLoop::Delay->new;
my $promise2 = Mojo::IOLoop::Delay->new;
my $promise3 = Mojo::IOLoop::Delay->new;
$promise->race($promise2, $promise3)
->then(sub { $resolve = shift }, sub { $reject = shift });
$promise->resolve('first');
$promise2->resolve('second');
$promise3->resolve('third');
is $resolve, 'first', 'promise was resolved';
is $reject, undef, 'promise was not rejected';
($resolve, $reject) = ();
$promise = Mojo::IOLoop::Delay->new;
$promise2 = Mojo::IOLoop::Delay->new;
$promise3 = Mojo::IOLoop::Delay->new;
$promise->race($promise2, $promise3)
->then(sub { $resolve = shift }, sub { $reject = shift });
$promise3->reject('third');
$promise->reject('first');
$promise2->reject('second');
is $resolve, undef, 'promise was not resolved';
is $reject, 'third', 'promise was rejected';

# Thenable (already settled)
($resolve, $reject) = ();
$delay = Mojo::IOLoop::Delay->new;
$delay->resolve('works');
$delay->then(sub { $resolve = shift }, sub { $reject = shift });
is $resolve, undef, 'promise was not resolved';
is $reject, undef, 'promise was not rejected';
$delay->ioloop->one_tick;
is $resolve, 'works', 'promise was resolved';
is $reject, undef, 'promise was not rejected';
($resolve, $reject) = ();
$delay = Mojo::IOLoop::Delay->new->catch(sub { });
$delay->reject('works too');
$delay->then(sub { $resolve = shift }, sub { $reject = shift });
is $resolve, undef, 'promise was not resolved';
is $reject, undef, 'promise was not rejected';
$delay->ioloop->one_tick;
is $resolve, undef, 'promise was not resolved';
is $reject, 'works too', 'promise was rejected';

# Argument splicing
$delay = Mojo::IOLoop::Delay->new;
Mojo::IOLoop->next_tick($delay->begin);
Expand Down Expand Up @@ -301,7 +319,9 @@ $delay->steps(
},
sub { die 'Second step!' },
sub { $result = 'failed' }
)->catch(sub { $failed = shift })->wait;
);
$delay->on(error => sub { $failed = pop });
$delay->wait;
is_deeply $delay->remaining, [], 'no remaining steps';
like $failed, qr/^Second step!/, 'right error';
ok !$finished, 'finish event has not been emitted';
Expand Down

0 comments on commit 8de82b9

Please sign in to comment.