Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d6ced1c7809f
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: ae4615281f90
Choose a head ref
  • 4 commits
  • 7 files changed
  • 1 contributor

Commits on May 24, 2015

  1. Copy the full SHA
    a213738 View commit details
  2. Copy the full SHA
    e611e17 View commit details
  3. Copy the full SHA
    fc44950 View commit details
  4. Copy the full SHA
    ae46152 View commit details
Showing with 130 additions and 37 deletions.
  1. +10 −1 artiq/frontend/artiq_gui.py
  2. +79 −16 artiq/gui/explorer.py
  3. +22 −7 artiq/master/scheduler.py
  4. +3 −2 artiq/master/worker.py
  5. +7 −3 artiq/master/worker_impl.py
  6. +8 −7 artiq/protocols/pyon.py
  7. +1 −1 examples/master/repository/flopping_f_simulation.py
11 changes: 10 additions & 1 deletion artiq/frontend/artiq_gui.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
from pyqtgraph import dockarea

from artiq.protocols.file_db import FlatFileDB
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.gui.explorer import ExplorerDock
from artiq.gui.parameters import ParametersDock
from artiq.gui.log import LogDock
@@ -43,6 +44,11 @@ def main():
asyncio.set_event_loop(loop)
atexit.register(lambda: loop.close())

schedule_ctl = AsyncioClient()
loop.run_until_complete(schedule_ctl.connect_rpc(
args.server, args.port_control, "master_schedule"))
atexit.register(lambda: schedule_ctl.close_rpc())

win = QtGui.QMainWindow()
area = dockarea.DockArea()
win.setCentralWidget(area)
@@ -52,8 +58,11 @@ def main():
win.resize(1400, 800)
win.setWindowTitle("ARTIQ")

d_explorer = ExplorerDock()
d_explorer = ExplorerDock(status_bar, schedule_ctl)
area.addDock(d_explorer, "top")
loop.run_until_complete(d_explorer.sub_connect(
args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_explorer.sub_close()))

d_params = ParametersDock()
area.addDock(d_params, "right", d_explorer)
95 changes: 79 additions & 16 deletions artiq/gui/explorer.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,104 @@
import asyncio

from quamash import QtGui, QtCore
from pyqtgraph import dockarea
from pyqtgraph import LayoutWidget

from artiq.protocols.sync_struct import Subscriber
from artiq.gui.tools import DictSyncModel


class _ExplistModel(DictSyncModel):
def __init__(self, parent, init):
DictSyncModel.__init__(self,
["Experiment"],
parent, init)

def sort_key(self, k, v):
return k

def convert(self, k, v, column):
return k


class ExplorerDock(dockarea.Dock):
def __init__(self):
def __init__(self, status_bar, schedule_ctl):
dockarea.Dock.__init__(self, "Explorer", size=(1100, 400))

self.status_bar = status_bar
self.schedule_ctl = schedule_ctl

splitter = QtGui.QSplitter(QtCore.Qt.Horizontal)
self.addWidget(splitter)

grid = LayoutWidget()
splitter.addWidget(grid)

el = QtGui.QListView()
grid.addWidget(el, 0, 0, colspan=4)
self.el = QtGui.QListView()
grid.addWidget(self.el, 0, 0, colspan=4)

datetime = QtGui.QDateTimeEdit()
datetime.setDisplayFormat("MMM d yyyy hh:mm:ss")
datetime.setCalendarPopup(True)
datetime.setDate(QtCore.QDate.currentDate())
datetime_en = QtGui.QCheckBox("Set due date:")
grid.addWidget(datetime_en, 1, 0)
grid.addWidget(datetime, 1, 1, colspan=3)
self.datetime = QtGui.QDateTimeEdit()
self.datetime.setDisplayFormat("MMM d yyyy hh:mm:ss")
self.datetime.setCalendarPopup(True)
self.datetime.setDate(QtCore.QDate.currentDate())
self.datetime_en = QtGui.QCheckBox("Set due date:")
grid.addWidget(self.datetime_en, 1, 0)
grid.addWidget(self.datetime, 1, 1, colspan=3)

pipeline = QtGui.QLineEdit()
pipeline.insert("main")
self.pipeline = QtGui.QLineEdit()
self.pipeline.insert("main")
grid.addLabel("Pipeline:", 2, 0)
grid.addWidget(pipeline, 2, 1)
grid.addWidget(self.pipeline, 2, 1)

priority = QtGui.QSpinBox()
priority.setRange(-99, 99)
self.priority = QtGui.QSpinBox()
self.priority.setRange(-99, 99)
grid.addLabel("Priority:", 2, 2)
grid.addWidget(priority, 2, 3)
grid.addWidget(self.priority, 2, 3)

submit = QtGui.QPushButton("Submit")
grid.addWidget(submit, 3, 0, colspan=4)
submit.clicked.connect(self.submit_clicked)

placeholder = QtGui.QWidget()
splitter.addWidget(placeholder)

@asyncio.coroutine
def sub_connect(self, host, port):
self.explist_subscriber = Subscriber("explist",
self.init_explist_model)
yield from self.explist_subscriber.connect(host, port)

@asyncio.coroutine
def sub_close(self):
yield from self.explist_subscriber.close()

def init_explist_model(self, init):
self.explist_model = _ExplistModel(self.el, init)
self.el.setModel(self.explist_model)
return self.explist_model

@asyncio.coroutine
def submit(self, pipeline_name, file, experiment, arguments,
priority, due_date):
expid = {
"file": file,
"experiment": experiment,
"arguments": arguments,
}
rid = yield from self.schedule_ctl.submit(pipeline_name, expid,
priority, due_date)
self.status_bar.showMessage("Submitted RID {}".format(rid))

def submit_clicked(self):
idx = self.el.selectedIndexes()
if idx:
row = idx[0].row()
key = self.explist_model.row_to_key[row]
expinfo = self.explist_model.data[key]
if self.datetime_en.isChecked():
due_date = self.datetime.dateTime().toMSecsSinceEpoch()/1000
else:
due_date = None
asyncio.async(self.submit(self.pipeline.text(),
expinfo["file"], expinfo["experiment"],
dict(), self.priority.value(), due_date))
29 changes: 22 additions & 7 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -98,7 +98,8 @@ def close(self):

@asyncio.coroutine
def prepare(self):
yield from self._prepare(self.rid, self.pipeline_name, self.expid)
yield from self._prepare(self.rid, self.pipeline_name, self.expid,
self.priority)

run = _mk_worker_method("run")
resume = _mk_worker_method("resume")
@@ -160,7 +161,8 @@ def _do(self):


class PrepareStage(TaskObject):
def __init__(self, pool, outq):
def __init__(self, deleter, pool, outq):
self.deleter = deleter
self.pool = pool
self.outq = outq

@@ -185,7 +187,13 @@ def _push_runs(self):
return None
if run.due_date is None or run.due_date < now:
run.status = RunStatus.preparing
yield from run.prepare()
try:
yield from run.prepare()
except:
logger.warning("got worker exception in prepare stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
run.status = RunStatus.prepare_done
yield from self.outq.put(run)
else:
@@ -235,7 +243,8 @@ def _do(self):
run.status = RunStatus.running
completed = yield from run.run()
except:
logger.warning("got worker exception, deleting RID %d",
logger.warning("got worker exception in run stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
else:
@@ -257,16 +266,22 @@ def _do(self):
while True:
run = yield from self.inq.get()
run.status = RunStatus.analyzing
yield from run.analyze()
yield from run.write_results()
try:
yield from run.analyze()
yield from run.write_results()
except:
logger.warning("got worker exception in analyze stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
run.status = RunStatus.analyze_done
self.deleter.delete(run.rid)


class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
self.pool = RunPool(ridc, worker_handlers, notifier)
self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1))
self._prepare = PrepareStage(deleter, self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(deleter, self._run.outq)

5 changes: 3 additions & 2 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -202,14 +202,15 @@ def _worker_action(self, obj, timeout=None):
return completed

@asyncio.coroutine
def prepare(self, rid, pipeline_name, expid):
def prepare(self, rid, pipeline_name, expid, priority):
self.rid = rid
yield from self._create_process()
yield from self._worker_action(
{"action": "prepare",
"rid": rid,
"pipeline_name": pipeline_name,
"expid": expid},
"expid": expid,
"priority": priority},
self.prepare_timeout)

@asyncio.coroutine
10 changes: 7 additions & 3 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -79,12 +79,13 @@ class Scheduler:
pause = staticmethod(make_parent_action("pause", ""))

submit = staticmethod(make_parent_action("scheduler_submit",
"pipeline_name expid due_date"))
"pipeline_name expid priority due_date"))
cancel = staticmethod(make_parent_action("scheduler_cancel", "rid"))

def __init__(self, pipeline_name, expid):
def __init__(self, pipeline_name, expid, priority):
self.pipeline_name = pipeline_name
self.expid = expid
self.priority = priority


def get_exp(file, exp):
@@ -121,9 +122,12 @@ def main():
rid = obj["rid"]
pipeline_name = obj["pipeline_name"]
expid = obj["expid"]
priority = obj["priority"]
exp = get_exp(expid["file"], expid["experiment"])
exp_inst = exp(dbh,
scheduler=Scheduler(pipeline_name, expid),
scheduler=Scheduler(pipeline_name,
expid,
priority),
**expid["arguments"])
rdb.build()
put_object({"action": "completed"})
15 changes: 8 additions & 7 deletions artiq/protocols/pyon.py
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@

import base64
from fractions import Fraction
import os
import tempfile

import numpy

@@ -158,18 +160,17 @@ def decode(s):


def store_file(filename, x):
"""Encodes a Python object and writes it to the specified file.
"""
"""Encodes a Python object and writes it to the specified file."""
contents = encode(x, True)
with open(filename, "w") as f:
directory = os.path.abspath(os.path.dirname(filename))
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as f:
f.write(contents)
f.write("\n")
tmpname = f.name
os.replace(tmpname, filename)


def load_file(filename):
"""Parses the specified file and returns the decoded Python object.
"""
"""Parses the specified file and returns the decoded Python object."""
with open(filename, "r") as f:
return decode(f.read())
2 changes: 1 addition & 1 deletion examples/master/repository/flopping_f_simulation.py
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ def run(self):
self.brightness.append(brightness)
time.sleep(0.1)
self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid,
time.time() + 20)
self.scheduler.priority, time.time() + 20)

def analyze(self):
popt, pcov = curve_fit(model_numpy,