Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cca6a17cfb37
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0dd5692c326a
Choose a head ref
  • 2 commits
  • 8 files changed
  • 1 contributor

Commits on Feb 20, 2015

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    4d21b78 View commit details
  2. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    0dd5692 View commit details
13 changes: 7 additions & 6 deletions artiq/frontend/artiq_client.py
Original file line number Diff line number Diff line change
@@ -104,19 +104,20 @@ def _action_submit(remote, args):
run_params = {
"file": args.file,
"unit": args.unit,
"timeout": args.timeout,
"arguments": arguments,
"rtr_group": args.rtr_group if args.rtr_group is not None \
else args.file
}
if args.timed is None:
rid = remote.run_queued(run_params, args.timeout)
rid = remote.run_queued(run_params)
print("RID: {}".format(rid))
else:
if args.timed == "now":
next_time = None
else:
next_time = time.mktime(parse_date(args.timed).timetuple())
trid = remote.run_timed(run_params, args.timeout, next_time)
trid = remote.run_timed(run_params, next_time)
print("TRID: {}".format(trid))


@@ -147,9 +148,9 @@ def _show_queue(queue):
clear_screen()
if queue:
table = PrettyTable(["RID", "File", "Unit", "Timeout", "Arguments"])
for rid, run_params, timeout in queue:
for rid, run_params in queue:
row = [rid, run_params["file"]]
for x in run_params["unit"], timeout:
for x in run_params["unit"], run_params["timeout"]:
row.append("-" if x is None else x)
row.append(format_run_arguments(run_params["arguments"]))
table.add_row(row)
@@ -164,10 +165,10 @@ def _show_timed(timed):
table = PrettyTable(["Next run", "TRID", "File", "Unit",
"Timeout", "Arguments"])
sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0]))
for trid, (next_run, run_params, timeout) in sp:
for trid, (next_run, run_params) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
trid, run_params["file"]]
for x in run_params["unit"], timeout:
for x in run_params["unit"], run_params["timeout"]:
row.append("-" if x is None else x)
row.append(format_run_arguments(run_params["arguments"]))
table.add_row(row)
15 changes: 12 additions & 3 deletions artiq/frontend/artiq_run.py
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ def __init__(self):
self.next_rid = 0
self.next_trid = 0

def run_queued(self, run_params, timeout):
def run_queued(self, run_params):
rid = self.next_rid
self.next_rid += 1
print("Queuing: {}, RID={}".format(run_params, rid))
@@ -48,7 +48,7 @@ def run_queued(self, run_params, timeout):
def cancel_queued(self, rid):
print("Cancelling RID {}".format(rid))

def run_timed(self, run_params, timeout, next_run):
def run_timed(self, run_params, next_run):
trid = self.next_trid
self.next_trid += 1
next_run_s = time.strftime("%m/%d %H:%M:%S", time.localtime(next_run))
@@ -137,7 +137,16 @@ def main():
print("Failed to parse run arguments")
sys.exit(1)

unit_inst = unit(dbh, scheduler=DummyScheduler(), **arguments)
run_params = {
"file": args.file,
"unit": args.unit,
"timeout": None,
"arguments": arguments
}
unit_inst = unit(dbh,
scheduler=DummyScheduler(),
run_params=run_params,
**run_params["arguments"])
unit_inst.run()
if hasattr(unit_inst, "analyze"):
unit_inst.analyze()
3 changes: 2 additions & 1 deletion artiq/gui/explorer.py
Original file line number Diff line number Diff line change
@@ -147,7 +147,8 @@ def run(self, widget):
run_params = {
"file": data["file"],
"unit": data["unit"],
"timeout": None,
"arguments": arguments,
"rtr_group": data["file"]
}
asyncio.Task(self.schedule_ctl.run_queued(run_params, None))
asyncio.Task(self.schedule_ctl.run_queued(run_params))
8 changes: 4 additions & 4 deletions artiq/gui/scheduler.py
Original file line number Diff line number Diff line change
@@ -10,9 +10,9 @@

class _QueueStoreSyncer(ListSyncer):
def convert(self, x):
rid, run_params, timeout = x
rid, run_params = x
row = [rid, run_params["file"]]
for e in run_params["unit"], timeout:
for e in run_params["unit"], run_params["timeout"]:
row.append("-" if e is None else str(e))
row.append(format_run_arguments(run_params["arguments"]))
return row
@@ -24,10 +24,10 @@ def order_key(self, kv_pair):
return (kv_pair[1][0], kv_pair[0])

def convert(self, trid, x):
next_run, run_params, timeout = x
next_run, run_params = x
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
trid, run_params["file"]]
for e in run_params["unit"], timeout:
for e in run_params["unit"], run_params["timeout"]:
row.append("-" if e is None else str(e))
row.append(format_run_arguments(run_params["arguments"]))
return row
24 changes: 12 additions & 12 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -37,37 +37,37 @@ def stop(self):
del self.task
yield from self.worker.end_process()

def run_queued(self, run_params, timeout):
def run_queued(self, run_params):
rid = self.new_rid()
self.queue.append((rid, run_params, timeout))
self.queue.append((rid, run_params))
self.queue_modified.set()
return rid

def cancel_queued(self, rid):
idx = next(idx for idx, (qrid, _, _)
idx = next(idx for idx, (qrid, _)
in enumerate(self.queue.read)
if qrid == rid)
if idx == 0:
# Cannot cancel when already running
raise NotImplementedError
del self.queue[idx]

def run_timed(self, run_params, timeout, next_run):
def run_timed(self, run_params, next_run):
if next_run is None:
next_run = time()
trid = self.new_trid()
self.timed[trid] = next_run, run_params, timeout
self.timed[trid] = next_run, run_params
self.timed_modified.set()
return trid

def cancel_timed(self, trid):
del self.timed[trid]

@asyncio.coroutine
def _run(self, rid, run_params, timeout):
def _run(self, rid, run_params):
self.run_cb(rid, run_params)
try:
yield from self.worker.run(run_params, timeout)
yield from self.worker.run(run_params)
except Exception as e:
print("RID {} failed:".format(rid))
print(e)
@@ -92,21 +92,21 @@ def _run_timed(self):
if min_next_run > 0:
return min_next_run

next_run, run_params, timeout = self.timed.read[min_trid]
next_run, run_params = self.timed.read[min_trid]
del self.timed[min_trid]

rid = self.new_rid()
self.queue.insert(0, (rid, run_params, timeout))
yield from self._run(rid, run_params, timeout)
self.queue.insert(0, (rid, run_params))
yield from self._run(rid, run_params)
del self.queue[0]

@asyncio.coroutine
def _schedule(self):
while True:
next_timed = yield from self._run_timed()
if self.queue.read:
rid, run_params, timeout = self.queue.read[0]
yield from self._run(rid, run_params, timeout)
rid, run_params = self.queue.read[0]
yield from self._run(rid, run_params)
del self.queue[0]
else:
self.queue_modified.clear()
4 changes: 2 additions & 2 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -60,13 +60,13 @@ def _recv(self, timeout):
return obj

@asyncio.coroutine
def run(self, run_params, result_timeout):
def run(self, run_params):
yield from self._send(run_params, self.send_timeout)
obj = yield from self._recv(self.start_reply_timeout)
if obj != "ack":
raise WorkerFailed("Incorrect acknowledgement")
while True:
obj = yield from self._recv(result_timeout)
obj = yield from self._recv(run_params["timeout"])
action = obj["action"]
if action == "report_completed":
if obj["status"] != "ok":
16 changes: 9 additions & 7 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -59,11 +59,10 @@ def publish_rt_results(notifier, data):


class Scheduler:
run_queued = make_parent_action("scheduler_run_queued",
"run_params timeout")
run_queued = make_parent_action("scheduler_run_queued", "run_params")
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed",
"run_params timeout next_run")
"run_params next_run")
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")


@@ -82,8 +81,8 @@ def get_unit(file, unit):
return getattr(module, unit)


def run(obj):
unit = get_unit(obj["file"], obj["unit"])
def run(run_params):
unit = get_unit(run_params["file"], run_params["unit"])

realtime_results = unit.realtime_results()
init_rt_results(realtime_results)
@@ -101,7 +100,10 @@ def run(obj):
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
try:
unit_inst = unit(dbh, scheduler=Scheduler, **obj["arguments"])
unit_inst = unit(dbh,
scheduler=Scheduler,
run_params=run_params,
**run_params["arguments"])
unit_inst.run()
if hasattr(unit_inst, "analyze"):
unit_inst.analyze()
@@ -115,7 +117,7 @@ def run(obj):
finally:
dbh.close()

filename = obj["file"] + ".h5"
filename = run_params["file"] + ".h5"
f = h5py.File(filename, "w")
try:
rdb.write_hdf5(f)
9 changes: 1 addition & 8 deletions examples/flopping_f_simulation.py
Original file line number Diff line number Diff line change
@@ -52,14 +52,7 @@ def run(self):
self.frequency.append(frequency)
self.brightness.append(brightness)
time.sleep(0.1)

run_params = {
"file": "flopping_f_simulation.py",
"unit": None,
"arguments": dict(),
"rtr_group": "flopping_f_simulation.py"
}
self.scheduler.run_timed(run_params, None, time.time() + 20)
self.scheduler.run_timed(self.run_params, time.time() + 20)

def analyze(self):
popt, pcov = curve_fit(model_numpy,