Skip to content

Commit

Permalink
Ensure completed jobs are automatically "unscheduled" [IMMUTANT-535]
Browse files Browse the repository at this point in the history
We introduce a bi-directional mapping between names and jobkeys and
respond to the triggerComplete callback to delete the mapping if the job
will never fire again.
  • Loading branch information
jcrossley3 committed Mar 10, 2015
1 parent 6c400f2 commit af973d9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Expand Up @@ -23,13 +23,16 @@
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.DirectSchedulerFactory;
import org.quartz.listeners.SchedulerListenerSupport;
import org.quartz.listeners.TriggerListenerSupport;
import org.quartz.simpl.RAMJobStore;
import org.quartz.simpl.SimpleThreadPool;
import org.quartz.spi.JobStore;
Expand Down Expand Up @@ -66,6 +69,7 @@ public void start() throws Exception {
factory.createScheduler(threadPool, new RAMJobStore());

this.scheduler = factory.getScheduler();
this.scheduler.getListenerManager().addTriggerListener(new TriggerListener());
this.scheduler.start();
started = true;
log.info("Quartz started");
Expand Down Expand Up @@ -125,7 +129,7 @@ public synchronized boolean schedule(String name, Runnable fn, Map<ScheduleOptio

@Override
public synchronized boolean unschedule(String name) throws SchedulerException {
if (currentJobs.containsKey(name)) {
if (currentJobs.contains(name)) {
JobKey job = currentJobs.remove(name);
try {
this.scheduler.deleteJob(job);
Expand All @@ -141,7 +145,7 @@ public synchronized boolean unschedule(String name) throws SchedulerException {

@Override
public Set<String> scheduledJobs() {
return Collections.unmodifiableSet(new HashSet<String>(this.currentJobs.keySet()));
return Collections.unmodifiableSet(this.currentJobs.getNames());
}

public synchronized JobKey lookupJob(String name) {
Expand Down Expand Up @@ -207,11 +211,50 @@ protected Trigger initTrigger(String name, Options<ScheduleOption> opts) {
return builder.build();
}

class TriggerListener extends TriggerListenerSupport {
public String getName() {
return "housekeeping";
}
public void triggerComplete(Trigger trigger, JobExecutionContext ctx, Trigger.CompletedExecutionInstruction i) {
if (!trigger.mayFireAgain()) {
QuartzScheduling.this.currentJobs.remove(ctx.getJobDetail().getKey());
}
}
}

static class Jobs {
synchronized void put(String name, JobKey key) {
this.names.put(name, key);
this.keys.put(key, name);
}
synchronized JobKey remove(String name) {
JobKey key = this.names.remove(name);
this.keys.remove(key);
return key;
}
synchronized String remove(JobKey key) {
String name = this.keys.remove(key);
this.names.remove(name);
return name;
}
JobKey get(String name) {
return this.names.get(name);
}
boolean contains(String name) {
return this.names.containsKey(name);
}
Set<String> getNames() {
return this.names.keySet();
}
private final Map<String, JobKey> names = new HashMap<>();
private final Map<JobKey, String> keys = new HashMap<>();
}

private final String name;
private int numThreads;
private boolean started;
private Scheduler scheduler;
private final Map<String, JobKey> currentJobs = new HashMap<>();
private final Jobs currentJobs = new Jobs();

private static final Logger log = WunderBoss.logger(Scheduling.class);
}
Expand Up @@ -76,7 +76,7 @@
(reset! should-run? false))))

(deftest scheduledJobs
(with-job #() {}
(with-job #() {:every 100}
(let [jobs (.scheduledJobs default)]
(is (= #{"a-job"} jobs))
;; it should be unmodifiable
Expand Down Expand Up @@ -216,3 +216,12 @@
(is (thrown?
IllegalArgumentException
(with-job* (fn []) {:until 5} (fn []))))))

(deftest completed-jobs-should-auto-unschedule
(is (empty? (.scheduledJobs default)))
(let [p (promise)
id "auto-unschedule"]
(.schedule default id #(deliver p :success) {})
(is (= :success (deref p 1000 :failure)))
(is (false? (.unschedule default id))))
(is (empty? (.scheduledJobs default))))

0 comments on commit af973d9

Please sign in to comment.