Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 255b00f6783f
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6b283d78d3dc
Choose a head ref
  • 2 commits
  • 5 files changed
  • 1 contributor

Commits on Dec 29, 2014

  1. Copy the full SHA
    02f3781 View commit details
  2. Copy the full SHA
    6b283d7 View commit details
Showing with 158 additions and 42 deletions.
  1. +15 −13 artiq/management/scheduler.py
  2. +39 −14 artiq/management/sync_struct.py
  3. +20 −7 frontend/artiq_client.py
  4. +79 −6 frontend/artiq_gui.py
  5. +5 −2 frontend/artiq_master.py
28 changes: 15 additions & 13 deletions artiq/management/scheduler.py
Original file line number Diff line number Diff line change
@@ -9,9 +9,9 @@ class Scheduler:
def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs)
self.next_rid = 0
self.queued = Notifier([])
self.queue = Notifier([])
self.queue_count = asyncio.Semaphore(0)
self.periodic = dict()
self.periodic = Notifier(dict())
self.periodic_modified = asyncio.Event()

def new_rid(self):
@@ -20,8 +20,8 @@ def new_rid(self):
return r

def new_prid(self):
prids = set(range(len(self.periodic) + 1))
prids -= set(self.periodic.keys())
prids = set(range(len(self.periodic.backing_struct) + 1))
prids -= set(self.periodic.backing_struct.keys())
return next(iter(prids))

@asyncio.coroutine
@@ -38,14 +38,14 @@ def stop(self):

def run_once(self, run_params, timeout):
rid = self.new_rid()
self.queued.append((rid, run_params, timeout))
self.queue.append((rid, run_params, timeout))
self.queue_count.release()
return rid

def cancel_once(self, rid):
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued)
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue)
if qrid == rid)
del self.queued[idx]
del self.queue[idx]

def run_periodic(self, run_params, timeout, period):
prid = self.new_prid()
@@ -61,7 +61,7 @@ def _run_periodic(self):
while True:
min_next_run = None
min_prid = None
for prid, params in self.periodic.items():
for prid, params in self.periodic.backing_struct.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_prid = prid
@@ -74,14 +74,15 @@ def _run_periodic(self):
if min_next_run > 0:
return min_next_run

next_run, run_params, timeout, period = self.periodic[min_prid]
next_run, run_params, timeout, period = \
self.periodic.backing_struct[min_prid]
self.periodic[min_prid] = now + period, run_params, timeout, period

rid = self.new_rid()
self.queued.insert(0, (rid, run_params, timeout))
self.queue.insert(0, (rid, run_params, timeout))
result = yield from self.worker.run(run_params, timeout)
print(prid, rid, result)
del self.queued[0]
del self.queue[0]

@asyncio.coroutine
def _schedule(self):
@@ -93,12 +94,13 @@ def _schedule(self):
[ev_queue, ev_periodic],
timeout=next_periodic,
return_when=asyncio.FIRST_COMPLETED)
self.periodic_modified.clear()
for t in pend:
t.cancel()

yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queued.backing_struct[0]
rid, run_params, timeout = self.queue.backing_struct[0]
result = yield from self.worker.run(run_params, timeout)
print(rid, result)
del self.queued[0]
del self.queue[0]
53 changes: 39 additions & 14 deletions artiq/management/sync_struct.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,8 @@


class Subscriber:
def __init__(self, target_builder, notify_cb=None):
def __init__(self, notifier_name, target_builder, notify_cb=None):
self.notifier_name = notifier_name
self.target_builder = target_builder
self.notify_cb = notify_cb

@@ -18,6 +19,7 @@ def connect(self, host, port):
yield from asyncio.open_connection(host, port)
try:
self._writer.write(_init_string)
self._writer.write((self.notifier_name + "\n").encode())
self.receive_task = asyncio.Task(self._receive_cr())
except:
self._writer.close()
@@ -56,8 +58,11 @@ def _receive_cr(self):
target.insert(obj["i"], obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "setitem":
target.__setitem__(obj["key"], obj["value"])
elif action == "delitem":
target.__delitem__(obj["key"])

if self.notify_cb is not None:
self.notify_cb()

@@ -73,32 +78,41 @@ def __init__(self, backing_struct):
def append(self, x):
self.backing_struct.append(x)
if self.publisher is not None:
self.publisher.publish({"action": "append", "x": x})
self.publisher.publish(self, {"action": "append", "x": x})

def insert(self, i, x):
self.backing_struct.insert(i, x)
if self.publisher is not None:
self.publisher.publish({"action": "insert", "i": i, "x": x})
self.publisher.publish(self, {"action": "insert", "i": i, "x": x})

def pop(self, i=-1):
r = self.backing_struct.pop(i)
if self.publisher is not None:
self.publisher.publish({"action": "pop", "i": i})
self.publisher.publish(self, {"action": "pop", "i": i})
return r

def __setitem__(self, key, value):
self.backing_struct.__setitem__(key, value)
if self.publisher is not None:
self.publisher.publish(self, {"action": "setitem",
"key": key,
"value": value})

def __delitem__(self, key):
self.backing_struct.__delitem__(key)
if self.publisher is not None:
self.publisher.publish({"action": "delitem", "key": key})
self.publisher.publish(self, {"action": "delitem", "key": key})


class Publisher(AsyncioServer):
def __init__(self, notifier):
def __init__(self, notifiers):
AsyncioServer.__init__(self)
self.notifier = notifier
self._recipients = set()
self.notifiers = notifiers
self._recipients = {k: set() for k in notifiers.keys()}
self._notifier_names = {id(v): k for k, v in notifiers.items()}

self.notifier.publisher = self
for notifier in notifiers.values():
notifier.publisher = self

@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
@@ -107,28 +121,39 @@ def _handle_connection_cr(self, reader, writer):
if line != _init_string:
return

obj = {"action": "init", "struct": self.notifier.backing_struct}
line = yield from reader.readline()
if not line:
return
notifier_name = line.decode()[:-1]

try:
notifier = self.notifiers[notifier_name]
except KeyError:
return

obj = {"action": "init", "struct": notifier.backing_struct}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())

queue = asyncio.Queue()
self._recipients.add(queue)
self._recipients[notifier_name].add(queue)
try:
while True:
line = yield from queue.get()
writer.write(line)
# raise exception on connection error
yield from writer.drain()
finally:
self._recipients.remove(queue)
self._recipients[notifier_name].remove(queue)
except ConnectionResetError:
# subscribers disconnecting are a normal occurence
pass
finally:
writer.close()

def publish(self, obj):
def publish(self, notifier, obj):
line = pyon.encode(obj) + "\n"
line = line.encode()
for recipient in self._recipients:
notifier_name = self._notifier_names[id(notifier)]
for recipient in self._recipients[notifier_name]:
recipient.put_nowait(line)
27 changes: 20 additions & 7 deletions frontend/artiq_client.py
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ def _get_args():
help="unit to run")
parser_add.add_argument("file", help="file containing the unit to run")


parser_cancel = subparsers.add_parser("cancel",
help="cancel an experiment")
parser_cancel.add_argument("-p", "--periodic", default=False,
@@ -46,8 +45,11 @@ def _get_args():
parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/PRID)")

parser_show = subparsers.add_parser("show-queue",
help="show the experiment queue")
parser_show_queue = subparsers.add_parser(
"show-queue", help="show the experiment queue")

parser_show_periodic = subparsers.add_parser(
"show-periodic", help="show the periodic experiment table")

return parser.parse_args()

@@ -93,7 +95,7 @@ def _show_periodic(periodic):
if periodic:
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
"Timeout", "Period"])
sp = sorted(periodic.items(), key=lambda x: x[1][0])
sp = sorted(periodic.items(), key=lambda x: (x[1][0], x[0]))
for prid, (next_run, run_params, timeout, period) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
@@ -107,6 +109,8 @@ def _show_periodic(periodic):


def _run_subscriber(host, port, subscriber):
if port is None:
port = 8887
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(subscriber.connect(host, port))
@@ -127,9 +131,18 @@ def main():
def init_queue(x):
queue[:] = x
return queue
subscriber = Subscriber(init_queue, lambda: _show_queue(queue))
port = 8887 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
subscriber = Subscriber("queue", init_queue,
lambda: _show_queue(queue))
_run_subscriber(args.server, args.port, subscriber)
elif args.action == "show-periodic":
periodic = dict()
def init_periodic(x):
periodic.clear()
periodic.update(x)
return periodic
subscriber = Subscriber("periodic", init_periodic,
lambda: _show_periodic(periodic))
_run_subscriber(args.server, args.port, subscriber)
else:
port = 8888 if args.port is None else args.port
remote = Client(args.server, port, "schedule_control")
85 changes: 79 additions & 6 deletions frontend/artiq_gui.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

import argparse
import asyncio
import time

import gbulb
from gi.repository import Gtk
@@ -19,8 +20,8 @@ def __init__(self, queue_store, init):
def _convert(self, x):
rid, run_params, timeout = x
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else str(x))
for e in run_params["unit"], run_params["function"], timeout:
row.append("-" if e is None else str(e))
return row

def append(self, x):
@@ -33,9 +34,57 @@ def __delitem__(self, key):
del self.queue_store[key]


class PeriodicStoreSyncer:
def __init__(self, periodic_store, init):
self.periodic_store = periodic_store
self.periodic_store.clear()
self.order = []
for prid, x in sorted(init.items(), key=lambda e: (e[1][0], e[0])):
self.periodic_store.append(self._convert(prid, x))
self.order.append((x[0], prid))

def _convert(self, prid, x):
next_run, run_params, timeout, period = x
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
for e in run_params["unit"], run_params["function"], timeout:
row.append("-" if e is None else str(e))
row.append(str(period))
return row

def _find_index(self, prid):
for i, e in enumerate(self.periodic_store):
if e[1] == prid:
return i
raise KeyError

def __setitem__(self, prid, x):
try:
i = self._find_index(prid)
except KeyError:
pass
else:
del self.periodic_store[i]
del self.order[i]
for i, o in enumerate(self.order):
if o > (x[0], prid):
break
self.periodic_store.insert(i, self._convert(prid, x))
self.order.insert(i, (x[0], prid))

def __delitem__(self, key):
i = self._find_index(key)
del self.periodic_store[i]
del self.order[i]


class SchedulerWindow(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self, title="Scheduler")
self.set_border_width(10)

vpane = Gtk.VPaned()
self.add(vpane)

self.queue_store = Gtk.ListStore(int, str, str, str, str)
tree = Gtk.TreeView(self.queue_store)
@@ -44,20 +93,44 @@ def __init__(self):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i)
tree.append_column(column)
self.add(tree)
scroll = Gtk.ScrolledWindow()
scroll.add(tree)
vpane.add1(scroll)

self.periodic_store = Gtk.ListStore(str, int, str, str, str, str, str)
tree = Gtk.TreeView(self.periodic_store)
for i, title in enumerate(["Next run", "PRID", "File", "Unit",
"Function", "Timeout", "Period"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i)
tree.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.add(tree)
vpane.add2(scroll)

@asyncio.coroutine
def sub_connect(self, host, port):
self.subscriber = Subscriber(self.init_queue_store)
yield from self.subscriber.connect(host, port)
self.queue_subscriber = Subscriber("queue", self.init_queue_store)
yield from self.queue_subscriber.connect(host, port)
try:
self.periodic_subscriber = Subscriber(
"periodic", self.init_periodic_store)
yield from self.periodic_subscriber.connect(host, port)
except:
yield from self.queue_subscriber.close()
raise

@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
yield from self.periodic_subscriber.close()
yield from self.queue_subscriber.close()

def init_queue_store(self, init):
return QueueStoreSyncer(self.queue_store, init)

def init_periodic_store(self, init):
return PeriodicStoreSyncer(self.periodic_store, init)


def _get_args():
parser = argparse.ArgumentParser(description="ARTIQ GUI client")
Loading