Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 66f82a13d3ca
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 3cf53667c8e2
Choose a head ref
  • 3 commits
  • 8 files changed
  • 1 contributor

Commits on Oct 6, 2015

  1. Copy the full SHA
    d94f021 View commit details
  2. Copy the full SHA
    139072d View commit details
  3. Copy the full SHA
    3cf5366 View commit details
7 changes: 6 additions & 1 deletion artiq/frontend/artiq_client.py
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ def get_argparser():
parser_delete = subparsers.add_parser("delete",
help="delete an experiment "
"from the schedule")
parser_delete.add_argument("-g", action="store_true",
help="request graceful termination")
parser_delete.add_argument("rid", type=int,
help="run identifier (RID)")

@@ -121,7 +123,10 @@ def _action_submit(remote, args):


def _action_delete(remote, args):
remote.delete(args.rid)
if args.g:
remote.request_termination(args.rid)
else:
remote.delete(args.rid)


def _action_set_parameter(remote, args):
18 changes: 13 additions & 5 deletions artiq/gui/schedule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import time
from functools import partial

from quamash import QtGui, QtCore
from pyqtgraph import dockarea
@@ -71,10 +72,14 @@ def __init__(self, status_bar, schedule_ctl):
self.addWidget(self.table)

self.table.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
request_termination_action = QtGui.QAction("Request termination", self.table)
request_termination_action.triggered.connect(partial(self.delete_clicked, True))
self.table.addAction(request_termination_action)
delete_action = QtGui.QAction("Delete", self.table)
delete_action.triggered.connect(self.delete_clicked)
delete_action.triggered.connect(partial(self.delete_clicked, False))
self.table.addAction(delete_action)


async def sub_connect(self, host, port):
self.subscriber = Subscriber("schedule", self.init_schedule_model)
await self.subscriber.connect(host, port)
@@ -87,13 +92,16 @@ def init_schedule_model(self, init):
self.table.setModel(self.table_model)
return self.table_model

async def delete(self, rid):
await self.schedule_ctl.delete(rid)
async def delete(self, rid, graceful):
if graceful:
await self.schedule_ctl.request_termination(rid)
else:
await self.schedule_ctl.delete(rid)

def delete_clicked(self):
def delete_clicked(self, graceful):
idx = self.table.selectedIndexes()
if idx:
row = idx[0].row()
rid = self.table_model.row_to_key[row]
self.status_bar.showMessage("Deleted RID {}".format(rid))
asyncio.ensure_future(self.delete(rid))
asyncio.ensure_future(self.delete(rid, graceful))
8 changes: 7 additions & 1 deletion artiq/language/core.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,8 @@
from functools import wraps


__all__ = ["int64", "round64", "kernel", "portable",
__all__ = ["int64", "round64", "TerminationRequested",
"kernel", "portable",
"set_time_manager", "set_syscall_manager", "set_watchdog_factory",
"RuntimeException", "EncodedException"]

@@ -77,6 +78,11 @@ def round64(x):
return int64(round(x))


class TerminationRequested(Exception):
"""Raised by ``pause`` when the user has requested termination."""
pass


_KernelFunctionInfo = namedtuple("_KernelFunctionInfo", "core_name k_function")


18 changes: 17 additions & 1 deletion artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ def __init__(self, rid, pipeline_name,
self.flush = flush

self.worker = Worker(pool.worker_handlers)
self.termination_requested = False

self._status = RunStatus.pending

@@ -267,7 +268,12 @@ async def _do(self):
try:
if run.status == RunStatus.paused:
run.status = RunStatus.running
completed = await run.resume()
# clear "termination requested" flag now
# so that if it is set again during the resume, this
# results in another exception.
request_termination = run.termination_requested
run.termination_requested = False
completed = await run.resume(request_termination)
else:
run.status = RunStatus.running
completed = await run.run()
@@ -422,3 +428,13 @@ def submit(self, pipeline_name, expid, priority, due_date, flush):

def delete(self, rid):
self._deleter.delete(rid)

def request_termination(self, rid):
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid]
if run.status == RunStatus.running or run.status == RunStatus.paused:
run.termination_requested = True
else:
self.delete(rid)
break
4 changes: 2 additions & 2 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -221,12 +221,12 @@ async def run(self):
self.yield_time = time.monotonic()
return completed

async def resume(self):
async def resume(self, request_termination):
stop_duration = time.monotonic() - self.yield_time
for wid, expiry in self.watchdogs:
self.watchdogs[wid] += stop_duration
completed = await self._worker_action({"status": "ok",
"data": None})
"data": request_termination})
if not completed:
self.yield_time = time.monotonic()
return completed
8 changes: 6 additions & 2 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from artiq.tools import file_import
from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output
from artiq.language.environment import is_experiment
from artiq.language.core import set_watchdog_factory
from artiq.language.core import set_watchdog_factory, TerminationRequested


def get_object():
@@ -93,7 +93,11 @@ def __exit__(self, type, value, traceback):


class Scheduler:
pause = staticmethod(make_parent_action("pause", ""))
pause_noexc = staticmethod(make_parent_action("pause", ""))

def pause(self):
if self.pause_noexc():
raise TerminationRequested

submit = staticmethod(make_parent_action("scheduler_submit",
"pipeline_name expid priority due_date flush"))
49 changes: 35 additions & 14 deletions artiq/test/scheduler.py
Original file line number Diff line number Diff line change
@@ -21,9 +21,12 @@ def build(self):
self.setattr_device("scheduler")

def run(self):
while True:
self.scheduler.pause()
sleep(0.2)
try:
while True:
self.scheduler.pause()
sleep(0.2)
except TerminationRequested:
self.set_parameter("termination_ok", True)


def _get_expid(name):
@@ -57,11 +60,6 @@ def _get_basic_steps(rid, expid, priority=0, flush=False):
]


_handlers = {
"init_rt_results": lambda description: None
}


class SchedulerCase(unittest.TestCase):
def setUp(self):
if os.name == "nt":
@@ -72,7 +70,7 @@ def setUp(self):

def test_steps(self):
loop = self.loop
scheduler = Scheduler(0, _handlers, None)
scheduler = Scheduler(0, dict(), None)
expid = _get_expid("EmptyExperiment")

expect = _get_basic_steps(1, expid)
@@ -108,13 +106,25 @@ def notify(mod):

def test_pause(self):
loop = self.loop
scheduler = Scheduler(0, _handlers, None)

termination_ok = False
def check_termination(key, value):
nonlocal termination_ok
self.assertEqual(key, "termination_ok")
self.assertEqual(value, True)
termination_ok = True
handlers = {
"set_parameter": check_termination
}
scheduler = Scheduler(0, handlers, None)

expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment")

expect = _get_basic_steps(1, expid)
background_running = asyncio.Event()
done = asyncio.Event()
empty_completed = asyncio.Event()
background_completed = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
@@ -123,23 +133,34 @@ def notify(mod):
"key": "status",
"action": "setitem"}:
background_running.set()
if mod == {"path": [0],
"value": "deleting",
"key": "status",
"action": "setitem"}:
background_completed.set()
if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1):
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
empty_completed.set()
scheduler.notifier.publish = notify

scheduler.start()
scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait())
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait())
loop.run_until_complete(empty_completed.wait())

self.assertFalse(termination_ok)
scheduler.request_termination(0)
loop.run_until_complete(background_completed.wait())
self.assertTrue(termination_ok)

loop.run_until_complete(scheduler.stop())

def test_flush(self):
loop = self.loop
scheduler = Scheduler(0, _handlers, None)
scheduler = Scheduler(0, dict(), None)
expid = _get_expid("EmptyExperiment")

expect = _get_basic_steps(1, expid, 1, True)
18 changes: 18 additions & 0 deletions examples/master/repository/run_forever.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from itertools import count
from time import sleep

from artiq import *


class RunForever(EnvExperiment):
def build(self):
self.setattr_device("scheduler")

def run(self):
try:
for i in count():
self.scheduler.pause()
sleep(1)
print("ping", i)
except TerminationRequested:
print("Terminated gracefully")