Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: f2134fa4b230
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 5ca4821a2989
Choose a head ref
  • 2 commits
  • 7 files changed
  • 1 contributor

Commits on Mar 11, 2015

  1. master: watchdog support

    Introduces a watchdog context manager to use in the experiment code that
    terminates the process with an error if it times out. The syntax is:
    
    with self.scheduler.watchdog(20*s):
       ...
    
    Watchdogs timers are implemented by the master process (and the worker
    communicates the necessary information about them) so that they can be
    enforced even if the worker crashes. They can be nested arbitrarily.
    During yields, all watchdog timers for the yielding worker are
    suspended [TODO]. Setting up watchdogs is not supported in kernels,
    however, a kernel can be called within watchdog contexts (and terminating
    the worker will terminate the kernel [TODO]).
    
    It is possible to implement a heartbeat mechanism using a watchdog, e.g.:
    
    for i in range(...):
        with self.scheduler.watchdog(...):
            ....
    
    Crashes/freezes within the iterator or the loop management would not be
    detected, but they should be rare enough.
    sbourdeauducq committed Mar 11, 2015
    Copy the full SHA
    d5795fd View commit details
  2. Copy the full SHA
    5ca4821 View commit details
Showing with 110 additions and 29 deletions.
  1. +2 −1 artiq/frontend/artiq_ctlmgr.py
  2. +1 −1 artiq/frontend/artiq_master.py
  3. +13 −0 artiq/frontend/artiq_run.py
  4. +7 −6 artiq/master/scheduler.py
  5. +56 −21 artiq/master/worker.py
  6. +15 −0 artiq/master/worker_impl.py
  7. +16 −0 artiq/tools.py
3 changes: 2 additions & 1 deletion artiq/frontend/artiq_ctlmgr.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

from artiq.protocols.sync_struct import Subscriber
from artiq.tools import verbosity_args, init_logger
from artiq.tools import asyncio_process_wait_timeout


logger = logging.getLogger(__name__)
@@ -65,7 +66,7 @@ def launcher(self, name, command, retry):
process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio.wait_for(process.wait(), timeout=5.0)
yield from asyncio_process_wait_timeout(process, 5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
name)
2 changes: 1 addition & 1 deletion artiq/frontend/artiq_master.py
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ def main():
def run_cb(rid, run_params):
rtr.current_group = run_params["rtr_group"]
scheduler = Scheduler(run_cb, get_last_rid() + 1)
scheduler.worker.handlers = {
scheduler.worker_handlers = {
"req_device": ddb.request,
"req_parameter": pdb.request,
"set_parameter": pdb.set,
13 changes: 13 additions & 0 deletions artiq/frontend/artiq_run.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,17 @@ def set(self, timestamp, name, value):
print("Parameter change: {} -> {}".format(name, value))


class DummyWatchdog:
def __init__(self, t):
pass

def __enter__(self):
pass

def __exit__(self, type, value, traceback):
pass


class DummyScheduler:
def __init__(self):
self.next_rid = 0
@@ -57,6 +68,8 @@ def run_timed(self, run_params, next_run):
def cancel_timed(self, trid):
print("Cancelling TRID {}".format(trid))

watchdog = DummyWatchdog


def get_argparser():
parser = argparse.ArgumentParser(
13 changes: 7 additions & 6 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -7,8 +7,8 @@

class Scheduler:
def __init__(self, run_cb, first_rid):
self.worker_handlers = dict()
self.run_cb = run_cb
self.worker = Worker()
self.next_rid = first_rid
self.queue = Notifier([])
self.queue_modified = asyncio.Event()
@@ -63,16 +63,17 @@ def cancel_timed(self, trid):
@asyncio.coroutine
def _run(self, rid, run_params):
self.run_cb(rid, run_params)
worker = Worker(self.worker_handlers)
try:
yield from self.worker.prepare(rid, run_params)
yield from worker.prepare(rid, run_params)
try:
yield from self.worker.run()
yield from self.worker.analyze()
yield from worker.run()
yield from worker.analyze()
finally:
yield from self.worker.close()
yield from worker.close()
except Exception as e:
print("RID {} failed:".format(rid))
print(e)
print("{}: {}".format(e.__class__.__name__, e))
else:
print("RID {} completed successfully".format(rid))

77 changes: 56 additions & 21 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -5,19 +5,46 @@
import time

from artiq.protocols import pyon
from artiq.language.units import strip_unit
from artiq.tools import asyncio_process_wait_timeout


class WorkerTimeout(Exception):
pass


class WorkerWatchdogTimeout(Exception):
pass


class WorkerError(Exception):
pass


class Worker:
def __init__(self,
def __init__(self, handlers,
send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
self.handlers = dict()
self.handlers = handlers
self.send_timeout = send_timeout
self.prepare_timeout = prepare_timeout
self.term_timeout = term_timeout
self.watchdogs = dict() # wid -> expiration (using time.monotonic)

def create_watchdog(self, t):
avail = set(range(len(self.watchdogs) + 1)) \
- set(self.watchdogs.keys())
wid = next(iter(avail))
self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s")
return wid

def delete_watchdog(self, wid):
del self.watchdogs[wid]

def watchdog_time(self):
if self.watchdogs:
return min(self.watchdogs.values()) - time.monotonic()
else:
return None

@asyncio.coroutine
def _create_process(self):
@@ -39,8 +66,8 @@ def close(self):
self.process.kill()
return
try:
yield from asyncio.wait_for(
self.process.wait(), timeout=self.term_timeout)
yield from asyncio_process_wait_timeout(self.process,
self.term_timeout)
except asyncio.TimeoutError:
self.process.kill()

@@ -54,7 +81,7 @@ def _send(self, obj, timeout):
if fut is not (): # FIXME: why does Python return this?
yield from asyncio.wait_for(fut, timeout=timeout)
except asyncio.TimeoutError:
raise WorkerError("Timeout sending data from worker")
raise WorkerTimeout("Timeout sending data from worker")
except:
raise WorkerError("Failed to send data to worker")

@@ -64,7 +91,7 @@ def _recv(self, timeout):
line = yield from asyncio.wait_for(
self.process.stdout.readline(), timeout=timeout)
except asyncio.TimeoutError:
raise WorkerError("Timeout receiving data from worker")
raise WorkerTimeout("Timeout receiving data from worker")
if not line:
raise WorkerError("Worker ended while attempting to receive data")
try:
@@ -74,20 +101,21 @@ def _recv(self, timeout):
return obj

@asyncio.coroutine
def _handle_worker_requests(self, timeout):
if timeout is None:
end_time = None
else:
end_time = time.monotonic() + timeout
def _handle_worker_requests(self, timeout_func):
while True:
obj = yield from self._recv(None if end_time is None
else end_time - time.monotonic())
obj = yield from self._recv(timeout_func())
action = obj["action"]
if action == "completed":
return
del obj["action"]
if action == "create_watchdog":
func = self.create_watchdog
elif action == "delete_watchdog":
func = self.delete_watchdog
else:
func = self.handlers[action]
try:
data = self.handlers[action](**obj)
data = func(**obj)
reply = {"status": "ok", "data": data}
except:
reply = {"status": "failed",
@@ -100,19 +128,26 @@ def prepare(self, rid, run_params):
try:
obj = {"action": "prepare", "rid": rid, "run_params": run_params}
yield from self._send(obj, self.send_timeout)
yield from self._handle_worker_requests(self.prepare_timeout)
end_time = time.monotonic() + self.prepare_timeout
yield from self._handle_worker_requests(
lambda: end_time - time.monotonic())
except:
yield from self.close()
raise

@asyncio.coroutine
def run(self):
obj = {"action": "run"}
def _run_analyze(self, action):
obj = {"action": action}
yield from self._send(obj, self.send_timeout)
yield from self._handle_worker_requests(None)
try:
yield from self._handle_worker_requests(self.watchdog_time)
except WorkerTimeout:
raise WorkerWatchdogTimeout

@asyncio.coroutine
def run(self):
yield from self._run_analyze("run")

@asyncio.coroutine
def analyze(self):
obj = {"action": "analyze"}
yield from self._send(obj, self.send_timeout)
yield from self._handle_worker_requests(None)
yield from self._run_analyze("analyze")
15 changes: 15 additions & 0 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -51,12 +51,27 @@ class ParentPDB:
update_rt_results = make_parent_action("update_rt_results", "mod")


class Watchdog:
_create = make_parent_action("create_watchdog", "t")
_delete = make_parent_action("delete_watchdog", "wid")

def __init__(self, t):
self.t = t

def __enter__(self):
self.wid = Watchdog._create(self.t)

def __exit__(self, type, value, traceback):
Watchdog._delete(self.wid)


class Scheduler:
run_queued = make_parent_action("scheduler_run_queued", "run_params")
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed",
"run_params next_run")
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")
watchdog = Watchdog


def get_exp(file, exp):
16 changes: 16 additions & 0 deletions artiq/tools.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
import linecache
import logging
import sys
import asyncio
import time
import os.path


@@ -58,3 +60,17 @@ def simple_network_args(parser, default_port):

def init_logger(args):
logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10)


@asyncio.coroutine
def asyncio_process_wait_timeout(process, timeout):
# In Python < 3.5, asyncio.wait_for(process.wait(), ...
# causes a futures.InvalidStateError inside asyncio if and when the
# process terminates after the timeout.
# Work around this problem.
end_time = time.monotonic() + timeout
r = True
while r:
r = yield from asyncio.wait_for(
process.stdout.read(1024),
timeout=end_time - time.monotonic())