Skip to content

Commit c3953d8

Browse files
committedDec 10, 2014
master/client: periodic schedule support
1 parent 347410a commit c3953d8

File tree

2 files changed

+74
-12
lines changed

2 files changed

+74
-12
lines changed
 

‎artiq/management/scheduler.py

+54-8
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@
77
class Scheduler:
88
def __init__(self, *args, **kwargs):
99
self.worker = Worker(*args, **kwargs)
10-
self.currently_executing = None
1110
self.next_rid = 0
11+
self.currently_executing = None
1212
self.queued = []
1313
self.queue_count = asyncio.Semaphore(0)
14+
self.periodic = dict()
15+
self.periodic_modified = asyncio.Event()
1416

1517
def new_rid(self):
1618
r = self.next_rid
1719
self.next_rid += 1
1820
return r
1921

22+
def new_prid(self):
23+
prids = set(range(len(self.periodic) + 1))
24+
prids -= set(self.periodic.keys())
25+
return next(iter(prids))
26+
2027
@asyncio.coroutine
2128
def start(self):
2229
self.task = asyncio.Task(self._schedule())
@@ -46,13 +53,16 @@ def get_schedule(self):
4653
else:
4754
rid, run_params, timeout, t = self.currently_executing
4855
ce = rid, run_params, timeout, time() - t
49-
return ce, self.queued
56+
return ce, self.queued, self.periodic
5057

5158
def run_periodic(self, run_params, timeout, period):
52-
raise NotImplementedError
59+
prid = self.new_prid()
60+
self.periodic[prid] = 0, run_params, timeout, period
61+
self.periodic_modified.set()
62+
return prid
5363

5464
def cancel_periodic(self, prid):
55-
raise NotImplementedError
65+
del self.periodic[prid]
5666

5767
@asyncio.coroutine
5868
def _run(self, rid, run_params, timeout):
@@ -62,9 +72,45 @@ def _run(self, rid, run_params, timeout):
6272
return result
6373

6474
@asyncio.coroutine
65-
def _schedule(self):
75+
def _run_periodic(self):
6676
while True:
67-
yield from self.queue_count.acquire()
68-
rid, run_params, timeout = self.queued.pop(0)
77+
min_next_run = None
78+
min_prid = None
79+
for prid, params in self.periodic.items():
80+
if min_next_run is None or params[0] < min_next_run:
81+
min_next_run = params[0]
82+
min_prid = prid
83+
84+
now = time()
85+
86+
if min_next_run is None:
87+
return None
88+
min_next_run -= now
89+
if min_next_run > 0:
90+
return min_next_run
91+
92+
next_run, run_params, timeout, period = self.periodic[min_prid]
93+
self.periodic[min_prid] = now + period, run_params, timeout, period
94+
95+
rid = self.new_rid()
6996
result = yield from self._run(rid, run_params, timeout)
70-
print(rid, result)
97+
print(prid, rid, result)
98+
99+
@asyncio.coroutine
100+
def _schedule(self):
101+
next_periodic = yield from self._run_periodic()
102+
while True:
103+
ev_queue = asyncio.Task(self.queue_count.acquire())
104+
ev_periodic = asyncio.Task(self.periodic_modified.wait())
105+
done, pend = yield from asyncio.wait(
106+
[ev_queue, ev_periodic],
107+
timeout=next_periodic,
108+
return_when=asyncio.FIRST_COMPLETED)
109+
for t in pend:
110+
t.cancel()
111+
112+
next_periodic = yield from self._run_periodic()
113+
if ev_queue in done:
114+
rid, run_params, timeout = self.queued.pop(0)
115+
result = yield from self._run(rid, run_params, timeout)
116+
print(rid, result)

‎frontend/artiq_client.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python3
22

33
import argparse
4+
import time
45

56
from prettytable import PrettyTable
67

@@ -70,10 +71,8 @@ def _action_cancel(remote, args):
7071

7172

7273
def _action_show(remote, args):
73-
ce, queue = remote.get_schedule()
74-
if ce is None and not queue:
75-
print("Queue is empty")
76-
else:
74+
ce, queue, periodic = remote.get_schedule()
75+
if ce is not None or queue:
7776
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
7877
if ce is not None:
7978
rid, run_params, timeout, t = ce
@@ -89,6 +88,23 @@ def _action_show(remote, args):
8988
table.add_row(row)
9089
print("Run queue:")
9190
print(table)
91+
else:
92+
print("Queue is empty")
93+
if periodic:
94+
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
95+
"Timeout", "Period"])
96+
print("Periodic schedule:")
97+
sp = sorted(periodic.items(), key=lambda x: x[1][0])
98+
for prid, (next_run, run_params, timeout, period) in sp:
99+
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
100+
prid, run_params["file"]]
101+
for x in run_params["unit"], run_params["function"], timeout:
102+
row.append("-" if x is None else x)
103+
row.append(period)
104+
table.add_row(row)
105+
print(table)
106+
else:
107+
print("No periodic schedule")
92108

93109

94110
def main():

0 commit comments

Comments
 (0)
Please sign in to comment.