Skip to content

Commit

Permalink
master/client: periodic schedule support
Browse files Browse the repository at this point in the history
  • Loading branch information
sbourdeauducq committed Dec 10, 2014
1 parent 347410a commit c3953d8
Showing 2 changed files with 74 additions and 12 deletions.
62 changes: 54 additions & 8 deletions artiq/management/scheduler.py
Original file line number Diff line number Diff line change
@@ -7,16 +7,23 @@
class Scheduler:
def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs)
self.currently_executing = None
self.next_rid = 0
self.currently_executing = None
self.queued = []
self.queue_count = asyncio.Semaphore(0)
self.periodic = dict()
self.periodic_modified = asyncio.Event()

def new_rid(self):
r = self.next_rid
self.next_rid += 1
return r

def new_prid(self):
prids = set(range(len(self.periodic) + 1))
prids -= set(self.periodic.keys())
return next(iter(prids))

@asyncio.coroutine
def start(self):
self.task = asyncio.Task(self._schedule())
@@ -46,13 +53,16 @@ def get_schedule(self):
else:
rid, run_params, timeout, t = self.currently_executing
ce = rid, run_params, timeout, time() - t
return ce, self.queued
return ce, self.queued, self.periodic

def run_periodic(self, run_params, timeout, period):
raise NotImplementedError
prid = self.new_prid()
self.periodic[prid] = 0, run_params, timeout, period
self.periodic_modified.set()
return prid

def cancel_periodic(self, prid):
raise NotImplementedError
del self.periodic[prid]

@asyncio.coroutine
def _run(self, rid, run_params, timeout):
@@ -62,9 +72,45 @@ def _run(self, rid, run_params, timeout):
return result

@asyncio.coroutine
def _schedule(self):
def _run_periodic(self):
while True:
yield from self.queue_count.acquire()
rid, run_params, timeout = self.queued.pop(0)
min_next_run = None
min_prid = None
for prid, params in self.periodic.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_prid = prid

now = time()

if min_next_run is None:
return None
min_next_run -= now
if min_next_run > 0:
return min_next_run

next_run, run_params, timeout, period = self.periodic[min_prid]
self.periodic[min_prid] = now + period, run_params, timeout, period

rid = self.new_rid()
result = yield from self._run(rid, run_params, timeout)
print(rid, result)
print(prid, rid, result)

@asyncio.coroutine
def _schedule(self):
next_periodic = yield from self._run_periodic()
while True:
ev_queue = asyncio.Task(self.queue_count.acquire())
ev_periodic = asyncio.Task(self.periodic_modified.wait())
done, pend = yield from asyncio.wait(
[ev_queue, ev_periodic],
timeout=next_periodic,
return_when=asyncio.FIRST_COMPLETED)
for t in pend:
t.cancel()

next_periodic = yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queued.pop(0)
result = yield from self._run(rid, run_params, timeout)
print(rid, result)
24 changes: 20 additions & 4 deletions frontend/artiq_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import argparse
import time

from prettytable import PrettyTable

@@ -70,10 +71,8 @@ def _action_cancel(remote, args):


def _action_show(remote, args):
ce, queue = remote.get_schedule()
if ce is None and not queue:
print("Queue is empty")
else:
ce, queue, periodic = remote.get_schedule()
if ce is not None or queue:
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
if ce is not None:
rid, run_params, timeout, t = ce
@@ -89,6 +88,23 @@ def _action_show(remote, args):
table.add_row(row)
print("Run queue:")
print(table)
else:
print("Queue is empty")
if periodic:
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
"Timeout", "Period"])
print("Periodic schedule:")
sp = sorted(periodic.items(), key=lambda x: x[1][0])
for prid, (next_run, run_params, timeout, period) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
row.append(period)
table.add_row(row)
print(table)
else:
print("No periodic schedule")


def main():

0 comments on commit c3953d8

Please sign in to comment.