Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: ff93e3c14978
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: e6b230b71c0e
Choose a head ref
  • 3 commits
  • 7 files changed
  • 1 contributor

Commits on Oct 29, 2015

  1. Copy the full SHA
    0e375e4 View commit details

Commits on Oct 30, 2015

  1. Copy the full SHA
    2c77c80 View commit details
  2. Copy the full SHA
    e6b230b View commit details
7 changes: 6 additions & 1 deletion artiq/frontend/artiq_master.py
Original file line number Diff line number Diff line change
@@ -83,7 +83,12 @@ def main():
"log": log_worker
}
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
worker_handlers["scheduler_submit"] = scheduler.submit
worker_handlers.update({
"scheduler_submit": scheduler.submit,
"scheduler_delete": scheduler.delete,
"scheduler_request_termination": scheduler.request_termination,
"scheduler_get_status": scheduler.get_status
})
scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

14 changes: 11 additions & 3 deletions artiq/frontend/artiq_run.py
Original file line number Diff line number Diff line change
@@ -32,20 +32,28 @@ def run(self):

class DummyScheduler:
def __init__(self):
self.next_rid = 0
self.rid = 0
self.pipeline_name = "main"
self.priority = 0
self.expid = None

self._next_rid = 1

def submit(self, pipeline_name, expid, priority, due_date, flush):
rid = self.next_rid
self.next_rid += 1
rid = self._next_rid
self._next_rid += 1
logger.info("Submitting: %s, RID=%s", expid, rid)
return rid

def delete(self, rid):
logger.info("Deleting RID %s", rid)

def request_termination(self, rid):
logger.info("Requesting termination of RID %s", rid)

def get_status(self):
return dict()

def pause(self):
pass

7 changes: 2 additions & 5 deletions artiq/gui/explorer.py
Original file line number Diff line number Diff line change
@@ -368,12 +368,9 @@ def submit(self, pipeline, key, priority, due_date, flush):
arguments = self.argeditor_states[key]["argument_values"]
except KeyError:
arguments = dict()
asyncio.ensure_future(self.submit_task(self.pipeline.text(),
expinfo["file"],
asyncio.ensure_future(self.submit_task(pipeline, expinfo["file"],
expinfo["class_name"],
arguments,
priority,
due_date,
arguments, priority, due_date,
flush))

def submit_clicked(self):
8 changes: 8 additions & 0 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -412,6 +412,7 @@ async def stop(self):
logger.warning("some pipelines were not garbage-collected")

def submit(self, pipeline_name, expid, priority, due_date, flush):
"""Submits a new run."""
# mutates expid to insert head repository revision if None
if self._terminated:
return
@@ -427,9 +428,11 @@ def submit(self, pipeline_name, expid, priority, due_date, flush):
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)

def delete(self, rid):
"""Kills the run with the specified RID."""
self._deleter.delete(rid)

def request_termination(self, rid):
"""Requests graceful termination of the run with the specified RID."""
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid]
@@ -438,3 +441,8 @@ def request_termination(self, rid):
else:
self.delete(rid)
break

def get_status(self):
"""Returns a dictionary containing information about the runs currently
tracked by the scheduler."""
return self.notifier.read
10 changes: 7 additions & 3 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -95,9 +95,13 @@ def pause(self):
raise TerminationRequested

submit = staticmethod(make_parent_action("scheduler_submit"))
cancel = staticmethod(make_parent_action("scheduler_cancel"))
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))

def set_run_info(self, pipeline_name, expid, priority):
def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid
self.pipeline_name = pipeline_name
self.expid = expid
self.priority = priority
@@ -182,7 +186,7 @@ def main():
expf = expid["file"]
exp = get_exp(expf, expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info(
obj["pipeline_name"], expid, obj["priority"])
rid, obj["pipeline_name"], expid, obj["priority"])
exp_inst = exp(device_mgr, dataset_mgr,
**expid["arguments"])
put_object({"action": "completed"})
14 changes: 12 additions & 2 deletions doc/manual/management_system.rst
Original file line number Diff line number Diff line change
@@ -111,8 +111,18 @@ Push commits containing experiments to the bare repository using e.g. Git over S

The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option.

Reference
*********
Scheduler API reference
***********************

The scheduler is exposed to the experiments via a virtual device called ``scheduler``. It can be requested like any regular device, and then the methods below can be called on the returned object.

The scheduler virtual device also contains the attributes ``rid``, ``pipeline_name``, ``priority`` and ``expid`` that contain the corresponding information about the current run.

.. autoclass:: artiq.master.scheduler.Scheduler
:members:

Front-end tool reference
************************

.. argparse::
:ref: artiq.frontend.artiq_master.get_argparser
18 changes: 18 additions & 0 deletions examples/master/repository/terminate_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from artiq import *


class TerminateAll(EnvExperiment):
def build(self):
self.setattr_device("scheduler")
self.setattr_argument("graceful_termination", BooleanValue(True))

def run(self):
if self.graceful_termination:
terminate = self.scheduler.request_termination
else:
terminate = self.scheduler.delete

print("our RID", self.scheduler.rid)
for rid in self.scheduler.get_status().keys():
if rid != self.scheduler.rid:
terminate(rid)