Skip to content

Commit

Permalink
add busy and wait events to Minion::Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Dec 15, 2017
1 parent c988ccf commit 49c322a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,5 +1,6 @@

8.08 2017-12-15
- Added busy and wait events to Minion::Worker.
- Added dequeue_timeout option to run method in Minion::Worker.
- Added -D option to worker command.

Expand Down
59 changes: 52 additions & 7 deletions lib/Minion/Worker.pm
Expand Up @@ -33,6 +33,12 @@ sub info {
->{workers}[0];
}

sub new {
my $self = shift->SUPER::new(@_);
$self->on(busy => sub { sleep 1 });
return $self;
}

sub process_commands {
my $self = shift;

Expand Down Expand Up @@ -113,15 +119,14 @@ sub _work {
$jobs->{$_}->is_finished and ++$status->{performed} and delete $jobs->{$_}
for keys %$jobs;

# Wait if job limit has been reached or worker is stopping
my $timeout = $status->{dequeue_timeout};
if (($status->{jobs} <= keys %$jobs) || $self->{finished}) { sleep 1 }
# Job limit has been reached or worker is stopping
return $self->emit('busy')
if ($status->{jobs} <= keys %$jobs) || $self->{finished};

# Try to get more jobs
elsif (my $job = $self->dequeue($timeout => {queues => $status->{queues}})) {
$jobs->{my $id = $job->id} = $job->start;
my ($pid, $task) = ($job->pid, $job->task);
}
my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)};
my $job = $self->emit('wait')->dequeue($max => {queues => $queues});
$jobs->{$job->id} = $job->start if $job;
}

1;
Expand Down Expand Up @@ -160,6 +165,22 @@ Stop immediately without finishing the current jobs.
L<Minion::Worker> inherits all events from L<Mojo::EventEmitter> and can emit
the following new ones.
=head2 busy
$worker->on(busy => sub {
my $worker = shift;
...
});
Emitted in the worker process when it is performing the maximum number of jobs
in parallel.
$worker->on(busy => sub {
my $worker = shift;
my $max = $worker->status->{jobs};
say "Performing $max jobs.";
});
=head2 dequeue
$worker->on(dequeue => sub {
Expand All @@ -175,6 +196,21 @@ Emitted in the worker process after a job has been dequeued.
say "Job $id has been dequeued.";
});
=head2 wait
$worker->on(wait => sub {
my $worker = shift;
...
});
Emitted in the worker process before it tries to dequeue a job.
$worker->on(wait => sub {
my $worker = shift;
my $max = $worker->status->{dequeue_timeout};
say "Waiting up to $max seconds for a new job.";
});
=head1 ATTRIBUTES
L<Minion::Worker> implements the following attributes.
Expand Down Expand Up @@ -302,6 +338,15 @@ Hash reference with whatever status information the worker would like to share.
=back
=head2 new
my $worker = Minion::Worker->new;
my $worker = Minion::Worker->new(status => {foo => 'bar'});
my $worker = Minion::Worker->new({status => {foo => 'bar'}});
Construct a new L<Minion::Worker> object and subscribe to L</"busy"> event with
default handler that sleeps for one second.
=head2 process_commands
$worker = $worker->process_commands;
Expand Down
3 changes: 3 additions & 0 deletions t/pg_worker.t
Expand Up @@ -31,7 +31,10 @@ $worker->on(
}
);
my $id = $minion->enqueue('test');
my $max;
$worker->once(wait => sub { $max = shift->status->{jobs} });
$worker->run;
is $max, 4, 'right value';
is_deeply $minion->job($id)->info->{result}, {just => 'works!'}, 'right result';

# Status
Expand Down

0 comments on commit 49c322a

Please sign in to comment.