Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 5ca4821a2989
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: f158711f7eda
Choose a head ref
  • 4 commits
  • 5 files changed
  • 1 contributor

Commits on Mar 11, 2015

  1. Copy the full SHA
    e037b93 View commit details
  2. test: do not close/recreate the asyncio event loop (WA for asyncio bu…

    …gs when multiple tests are run)
    sbourdeauducq committed Mar 11, 2015
    Copy the full SHA
    4ba54ac View commit details
  3. Copy the full SHA
    43a05c7 View commit details
  4. Copy the full SHA
    f158711 View commit details
Showing with 108 additions and 25 deletions.
  1. +1 −0 artiq/master/scheduler.py
  2. +37 −21 artiq/master/worker.py
  3. +2 −0 artiq/master/worker_impl.py
  4. +1 −4 artiq/test/pc_rpc.py
  5. +67 −0 artiq/test/worker.py
1 change: 1 addition & 0 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ def _run(self, rid, run_params):
try:
yield from worker.run()
yield from worker.analyze()
yield from worker.write_results()
finally:
yield from worker.close()
except Exception as e:
58 changes: 37 additions & 21 deletions artiq/master/worker.py
Original file line number Diff line number Diff line change
@@ -23,15 +23,20 @@ class WorkerError(Exception):

class Worker:
def __init__(self, handlers,
send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
send_timeout=0.5, term_timeout=1.0,
prepare_timeout=15.0, results_timeout=15.0):
self.handlers = handlers
self.send_timeout = send_timeout
self.prepare_timeout = prepare_timeout
self.term_timeout = term_timeout
self.prepare_timeout = prepare_timeout
self.results_timeout = results_timeout
self.watchdogs = dict() # wid -> expiration (using time.monotonic)

def create_watchdog(self, t):
avail = set(range(len(self.watchdogs) + 1)) \
n_user_watchdogs = len(self.watchdogs)
if -1 in self.watchdogs:
n_user_watchdogs -= 1
avail = set(range(n_user_watchdogs + 1)) \
- set(self.watchdogs.keys())
wid = next(iter(avail))
self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s")
@@ -101,9 +106,12 @@ def _recv(self, timeout):
return obj

@asyncio.coroutine
def _handle_worker_requests(self, timeout_func):
def _handle_worker_requests(self):
while True:
obj = yield from self._recv(timeout_func())
try:
obj = yield from self._recv(self.watchdog_time())
except WorkerTimeout:
raise WorkerWatchdogTimeout
action = obj["action"]
if action == "completed":
return
@@ -122,32 +130,40 @@ def _handle_worker_requests(self, timeout_func):
"message": traceback.format_exc()}
yield from self._send(reply, self.send_timeout)

@asyncio.coroutine
def _worker_action(self, obj, timeout=None):
if timeout is not None:
self.watchdogs[-1] = time.monotonic() + timeout
try:
yield from self._send(obj, self.send_timeout)
try:
yield from self._handle_worker_requests()
except WorkerTimeout:
raise WorkerWatchdogTimeout
finally:
if timeout is not None:
del self.watchdogs[-1]

@asyncio.coroutine
def prepare(self, rid, run_params):
yield from self._create_process()
try:
obj = {"action": "prepare", "rid": rid, "run_params": run_params}
yield from self._send(obj, self.send_timeout)
end_time = time.monotonic() + self.prepare_timeout
yield from self._handle_worker_requests(
lambda: end_time - time.monotonic())
yield from self._worker_action(
{"action": "prepare", "rid": rid, "run_params": run_params},
self.prepare_timeout)
except:
yield from self.close()
raise

@asyncio.coroutine
def _run_analyze(self, action):
obj = {"action": action}
yield from self._send(obj, self.send_timeout)
try:
yield from self._handle_worker_requests(self.watchdog_time)
except WorkerTimeout:
raise WorkerWatchdogTimeout

@asyncio.coroutine
def run(self):
yield from self._run_analyze("run")
yield from self._worker_action({"action": "run"})

@asyncio.coroutine
def analyze(self):
yield from self._run_analyze("analyze")
yield from self._worker_action({"action": "analyze"})

@asyncio.coroutine
def write_results(self):
yield from self._worker_action({"action": "write_results"},
self.results_timeout)
2 changes: 2 additions & 0 deletions artiq/master/worker_impl.py
Original file line number Diff line number Diff line change
@@ -118,6 +118,8 @@ def main():
put_object({"action": "completed"})
elif action == "analyze":
exp_inst.analyze()
put_object({"action": "completed"})
elif action == "write_results":
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
rdb.write_hdf5(f)
5 changes: 1 addition & 4 deletions artiq/test/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -74,10 +74,7 @@ def _asyncio_echo(self):

def _loop_asyncio_echo(self):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._asyncio_echo())
finally:
loop.close()
loop.run_until_complete(self._asyncio_echo())

def test_asyncio_echo(self):
self._run_server_and_test(self._loop_asyncio_echo)
67 changes: 67 additions & 0 deletions artiq/test/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import unittest
import asyncio
import sys
from time import sleep

from artiq import *
from artiq.master.worker import *


class WatchdogNoTimeout(Experiment, AutoDB):
def run(self):
for i in range(10):
with self.scheduler.watchdog(0.5*s):
sleep(0.1)


class WatchdogTimeout(Experiment, AutoDB):
def run(self):
with self.scheduler.watchdog(0.1*s):
sleep(100.0)


class WatchdogTimeoutInBuild(Experiment, AutoDB):
def build(self):
with self.scheduler.watchdog(0.1*s):
sleep(100.0)

def run(self):
pass


@asyncio.coroutine
def _call_worker(worker, run_params):
yield from worker.prepare(0, run_params)
try:
yield from worker.run()
yield from worker.analyze()
finally:
yield from worker.close()


def _run_experiment(experiment):
run_params = {
"file": sys.modules[__name__].__file__,
"experiment": experiment,
"arguments": dict()
}
handlers = {
"init_rt_results": lambda description: None
}

worker = Worker(handlers)
loop = asyncio.get_event_loop()
loop.run_until_complete(_call_worker(worker, run_params))


class WatchdogCase(unittest.TestCase):
def test_watchdog_no_timeout(self):
_run_experiment("WatchdogNoTimeout")

def test_watchdog_timeout(self):
with self.assertRaises(WorkerWatchdogTimeout):
_run_experiment("WatchdogTimeout")

def test_watchdog_timeout_in_build(self):
with self.assertRaises(WorkerWatchdogTimeout):
_run_experiment("WatchdogTimeoutInBuild")