Skip to content

Commit 347410a

Browse files
committedDec 10, 2014
master/client: queue display and cancellations
1 parent 0dc4eb0 commit 347410a

File tree

3 files changed

+102
-17
lines changed

3 files changed

+102
-17
lines changed
 

‎artiq/management/scheduler.py

+40-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
import asyncio
2+
from time import time
23

34
from artiq.management.worker import Worker
45

56

67
class Scheduler:
78
def __init__(self, *args, **kwargs):
89
self.worker = Worker(*args, **kwargs)
10+
self.currently_executing = None
11+
self.next_rid = 0
912
self.queued = []
1013
self.queue_count = asyncio.Semaphore(0)
1114

15+
def new_rid(self):
16+
r = self.next_rid
17+
self.next_rid += 1
18+
return r
19+
1220
@asyncio.coroutine
1321
def start(self):
1422
self.task = asyncio.Task(self._schedule())
@@ -22,13 +30,41 @@ def stop(self):
2230
yield from self.worker.end_process()
2331

2432
def run_once(self, run_params, timeout):
25-
self.queued.append((run_params, timeout))
33+
rid = self.new_rid()
34+
self.queued.append((rid, run_params, timeout))
2635
self.queue_count.release()
36+
return rid
37+
38+
def cancel_once(self, rid):
39+
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued)
40+
if qrid == rid)
41+
del self.queued[idx]
42+
43+
def get_schedule(self):
44+
if self.currently_executing is None:
45+
ce = None
46+
else:
47+
rid, run_params, timeout, t = self.currently_executing
48+
ce = rid, run_params, timeout, time() - t
49+
return ce, self.queued
50+
51+
def run_periodic(self, run_params, timeout, period):
52+
raise NotImplementedError
53+
54+
def cancel_periodic(self, prid):
55+
raise NotImplementedError
56+
57+
@asyncio.coroutine
58+
def _run(self, rid, run_params, timeout):
59+
self.currently_executing = rid, run_params, timeout, time()
60+
result = yield from self.worker.run(run_params, timeout)
61+
self.currently_executing = None
62+
return result
2763

2864
@asyncio.coroutine
2965
def _schedule(self):
3066
while True:
3167
yield from self.queue_count.acquire()
32-
run_params, timeout = self.queued.pop(0)
33-
result = yield from self.worker.run(run_params, timeout)
34-
print(result)
68+
rid, run_params, timeout = self.queued.pop(0)
69+
result = yield from self._run(rid, run_params, timeout)
70+
print(rid, result)

‎frontend/artiq_client.py

+61-11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import argparse
44

5+
from prettytable import PrettyTable
6+
57
from artiq.management.pc_rpc import Client
68

79

@@ -15,8 +17,9 @@ def _get_args():
1517
help="TCP port to use to connect to the master")
1618

1719
subparsers = parser.add_subparsers(dest="action")
20+
subparsers.required = True
1821

19-
parser_add = subparsers.add_parser("add", help="add an experiment")
22+
parser_add = subparsers.add_parser("submit", help="submit an experiment")
2023
parser_add.add_argument(
2124
"-p", "--periodic", default=None, type=float,
2225
help="run the experiment periodically every given number of seconds")
@@ -29,23 +32,70 @@ def _get_args():
2932
help="unit to run")
3033
parser_add.add_argument("file", help="file containing the unit to run")
3134

35+
36+
parser_cancel = subparsers.add_parser("cancel",
37+
help="cancel an experiment")
38+
parser_cancel.add_argument("-p", "--periodic", default=False,
39+
action="store_true",
40+
help="cancel a periodic experiment")
41+
parser_cancel.add_argument("rid", type=int,
42+
help="run identifier (RID/PRID)")
43+
44+
parser_show = subparsers.add_parser("show",
45+
help="show the experiment schedule")
46+
3247
return parser.parse_args()
3348

3449

50+
def _action_submit(remote, args):
51+
run_params = {
52+
"file": args.file,
53+
"unit": args.unit,
54+
"function": args.function
55+
}
56+
if args.periodic is None:
57+
rid = remote.run_once(run_params, args.timeout)
58+
print("RID: {}".format(rid))
59+
else:
60+
prid = remote.run_periodic(run_params, args.timeout,
61+
args.periodic)
62+
print("PRID: {}".format(prid))
63+
64+
65+
def _action_cancel(remote, args):
66+
if args.periodic:
67+
remote.cancel_periodic(args.rid)
68+
else:
69+
remote.cancel_once(args.rid)
70+
71+
72+
def _action_show(remote, args):
73+
ce, queue = remote.get_schedule()
74+
if ce is None and not queue:
75+
print("Queue is empty")
76+
else:
77+
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
78+
if ce is not None:
79+
rid, run_params, timeout, t = ce
80+
print("Currently executing RID {} for {:.1f}s".format(rid, t))
81+
row = [rid, run_params["file"]]
82+
for x in run_params["unit"], run_params["function"], timeout:
83+
row.append("-" if x is None else x)
84+
table.add_row(row)
85+
for rid, run_params, timeout in queue:
86+
row = [rid, run_params["file"]]
87+
for x in run_params["unit"], run_params["function"], timeout:
88+
row.append("-" if x is None else x)
89+
table.add_row(row)
90+
print("Run queue:")
91+
print(table)
92+
93+
3594
def main():
3695
args = _get_args()
3796
remote = Client(args.server, args.port, "master")
3897
try:
39-
if args.action == "add":
40-
if args.periodic is None:
41-
remote.run_once(
42-
{
43-
"file": args.file,
44-
"unit": args.unit,
45-
"function": args.function
46-
}, args.timeout)
47-
else:
48-
raise NotImplementedError
98+
globals()["_action_" + args.action](remote, args)
4999
finally:
50100
remote.close_rpc()
51101

‎setup.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#!/usr/bin/env python3
2-
# -*- coding: utf8 -*-
32

43
from setuptools import setup, find_packages
54
from glob import glob
@@ -16,7 +15,7 @@
1615
long_description = open("README.rst").read(),
1716
license = "BSD",
1817
install_requires = [
19-
"sphinx", "numpy", "scipy"
18+
"sphinx", "numpy", "scipy", "prettytable"
2019
],
2120
extras_require = {},
2221
dependency_links = [],

0 commit comments

Comments
 (0)
Please sign in to comment.