Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 34aacd3c5f8e
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 9d4073ef3689
Choose a head ref
  • 2 commits
  • 7 files changed
  • 1 contributor

Commits on Jul 9, 2015

  1. Copy the full SHA
    96a5d73 View commit details
  2. Copy the full SHA
    9d4073e View commit details
Showing with 41 additions and 38 deletions.
  1. +1 −4 artiq/frontend/artiq_master.py
  2. +18 −6 artiq/language/experiment.py
  3. +6 −4 artiq/master/scheduler.py
  4. +10 −6 artiq/master/worker.py
  5. +4 −1 artiq/master/worker_impl.py
  6. +0 −16 artiq/protocols/file_db.py
  7. +2 −1 artiq/test/worker.py
5 changes: 1 addition & 4 deletions artiq/frontend/artiq_master.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@

from artiq.protocols.pc_rpc import Server
from artiq.protocols.sync_struct import Publisher
from artiq.protocols.file_db import FlatFileDB, SimpleHistory
from artiq.protocols.file_db import FlatFileDB
from artiq.master.scheduler import Scheduler
from artiq.master.results import RTResults, get_last_rid
from artiq.master.repository import Repository
@@ -36,8 +36,6 @@ def main():
init_logger(args)
ddb = FlatFileDB("ddb.pyon")
pdb = FlatFileDB("pdb.pyon")
simplephist = SimpleHistory(30)
pdb.hooks.append(simplephist)
rtr = RTResults()
repository = Repository()

@@ -74,7 +72,6 @@ def main():
"schedule": scheduler.notifier,
"devices": ddb.data,
"parameters": pdb.data,
"parameters_simplehist": simplephist.history,
"rt_results": rtr.groups,
"explist": repository.explist
})
24 changes: 18 additions & 6 deletions artiq/language/experiment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from inspect import isclass

__all__ = ["Experiment", "has_analyze", "is_experiment"]
__all__ = ["Experiment", "is_experiment"]


class Experiment:
@@ -9,11 +9,28 @@ class Experiment:
Deriving from this class enables automatic experiment discovery in
Python modules.
"""
def prepare(self):
"""Entry point for pre-computing data necessary for running the
experiment.
Doing such computations outside of ``run`` enables more efficient
scheduling of multiple experiments that need to access the shared
hardware during part of their execution.
This method must not interact with the hardware.
"""
pass

def run(self):
"""The main entry point of the experiment.
This method must be overloaded by the user to implement the main
control flow of the experiment.
This method may interact with the hardware.
The experiment may call the scheduler's ``pause`` method while in
``run``.
"""
raise NotImplementedError

@@ -32,11 +49,6 @@ def analyze(self):
pass


def has_analyze(experiment):
"""Checks if an experiment instance overloaded its ``analyze`` method."""
return experiment.analyze.__func__ is not Experiment.analyze


def is_experiment(o):
"""Checks if a Python object is an instantiable experiment."""
return isclass(o) and issubclass(o, Experiment) and o is not Experiment
10 changes: 6 additions & 4 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -99,13 +99,14 @@ def close(self):
yield from self.worker.close()
del self._notifier[self.rid]

_prepare = _mk_worker_method("prepare")
_build = _mk_worker_method("build")

@asyncio.coroutine
def prepare(self):
yield from self._prepare(self.rid, self.pipeline_name, self.expid,
self.priority)
def build(self):
yield from self._build(self.rid, self.pipeline_name, self.expid,
self.priority)

prepare = _mk_worker_method("prepare")
run = _mk_worker_method("run")
resume = _mk_worker_method("resume")
analyze = _mk_worker_method("analyze")
@@ -188,6 +189,7 @@ def _push_runs(self):
run.status = RunStatus.preparing
self.flush_tracker.add(run.rid)
try:
yield from run.build()
yield from run.prepare()
except:
logger.warning("got worker exception in prepare stage, "
16 changes: 10 additions & 6 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -28,11 +28,11 @@ class WorkerError(Exception):
class Worker:
def __init__(self, handlers,
send_timeout=0.5, term_timeout=1.0,
prepare_timeout=15.0, results_timeout=15.0):
build_timeout=15.0, results_timeout=15.0):
self.handlers = handlers
self.send_timeout = send_timeout
self.term_timeout = term_timeout
self.prepare_timeout = prepare_timeout
self.build_timeout = build_timeout
self.results_timeout = results_timeout

self.rid = None
@@ -142,7 +142,7 @@ def _recv(self, timeout):
[self.process.stdout.readline(), self.closed.wait()],
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker")
raise WorkerTimeout("Timeout receiving data from worker")
if self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
line = fs[0].result()
@@ -209,16 +209,20 @@ def _worker_action(self, obj, timeout=None):
return completed

@asyncio.coroutine
def prepare(self, rid, pipeline_name, expid, priority):
def build(self, rid, pipeline_name, expid, priority):
self.rid = rid
yield from self._create_process()
yield from self._worker_action(
{"action": "prepare",
{"action": "build",
"rid": rid,
"pipeline_name": pipeline_name,
"expid": expid,
"priority": priority},
self.prepare_timeout)
self.build_timeout)

@asyncio.coroutine
def prepare(self):
yield from self._worker_action({"action": "prepare"})

@asyncio.coroutine
def run(self):
5 changes: 4 additions & 1 deletion artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@ def main():
while True:
obj = get_object()
action = obj["action"]
if action == "prepare":
if action == "build":
start_time = time.localtime()
rid = obj["rid"]
pipeline_name = obj["pipeline_name"]
@@ -131,6 +131,9 @@ def main():
**expid["arguments"])
rdb.build()
put_object({"action": "completed"})
elif action == "prepare":
exp_inst.prepare()
put_object({"action": "completed"})
elif action == "run":
exp_inst.run()
put_object({"action": "completed"})
16 changes: 0 additions & 16 deletions artiq/protocols/file_db.py
Original file line number Diff line number Diff line change
@@ -36,19 +36,3 @@ def delete(self, name):
timestamp = time()
for hook in self.hooks:
hook.delete(timestamp, name)


class SimpleHistory:
def __init__(self, depth):
self.depth = depth
self.history = Notifier([])

def set(self, timestamp, name, value):
if len(self.history.read) >= self.depth:
del self.history[0]
self.history.append((timestamp, name, value))

def delete(self, timestamp, name):
if len(self.history.read) >= self.depth:
del self.history[0]
self.history.append((timestamp, name))
3 changes: 2 additions & 1 deletion artiq/test/worker.py
Original file line number Diff line number Diff line change
@@ -32,7 +32,8 @@ def run(self):
@asyncio.coroutine
def _call_worker(worker, expid):
try:
yield from worker.prepare(0, "main", expid, 0)
yield from worker.build(0, "main", expid, 0)
yield from worker.prepare()
yield from worker.run()
yield from worker.analyze()
finally: