Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 022a1ffd0905
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 627221a5cde2
Choose a head ref
  • 9 commits
  • 7 files changed
  • 1 contributor

Commits on Jan 26, 2016

  1. Copy the full SHA
    cda4a07 View commit details
  2. lda: test tweaks

    jordens committed Jan 26, 2016
    Copy the full SHA
    ccac852 View commit details

Commits on Jan 27, 2016

  1. Copy the full SHA
    f78eecb View commit details
  2. Merge branch 'master' into testbench-controllers

    * master:
      gui/experiments: float/bring into focus already open docks when opening experiments
      gui: reduce size of console dock
      protocols/logging,pc_rpc: do not print errors on Windows when clients disconnect
      gui: reduce size of schedule dock
      worker: Windows VMs are slow, increase send_timeout
      protocol/sync_struct: Windows also raises ConnectionAbortedError on disconnection
      gui: reduce size of log dock
      gui: reduce size of experiment dock
      protocols/logging/LogParser: handle Windows CRLF
      pyon: handle \r
      test/pipe_ipc: re-enable
      protocols/asyncio_server: minor cleanup
      protocols/pipe_ipc: Windows support
      Revert "Revert "test/pipe_ipc: temporarily skip test""
      Revert "try debugging weird unittest failure"
      try debugging weird unittest failure
      conda: restrict binutils-or1k-linux dependency to linux.
      transforms.iodelay_estimator: make diagnostics much more clear.
      Fix typo.
    jordens committed Jan 27, 2016
    Copy the full SHA
    99f7889 View commit details

Commits on Jan 28, 2016

  1. 12
    Copy the full SHA
    1b7020d View commit details
  2. Copy the full SHA
    982fbb0 View commit details
  3. Copy the full SHA
    ab5e8fd View commit details
  4. Copy the full SHA
    bb1db7d View commit details
  5. Merge branch 'testbench-controllers'

    closes #69
    
    * testbench-controllers:
      test.ctlmgr: add basic test tooling
      hardware_testbench: fix timeout handling
      hardware_testbench: use plain subprocess to start controllers
      hardware_testbench: run Crontrollers loop in thread, not the test
      hardware_testbench: run controllers
      lda: test tweaks
      artiq_ctlmgr: refactor into artiq.devices.ctlmgr
    jordens committed Jan 28, 2016
    15
    Copy the full SHA
    627221a View commit details
Showing with 394 additions and 278 deletions.
  1. +255 −0 artiq/devices/ctlmgr.py
  2. +7 −242 artiq/frontend/artiq_ctlmgr.py
  3. +70 −0 artiq/test/ctlmgr.py
  4. +40 −18 artiq/test/hardware_testbench.py
  5. +8 −7 artiq/test/lda.py
  6. +5 −4 artiq/test/novatech409b.py
  7. +9 −7 artiq/test/thorlabs_tcube.py
255 changes: 255 additions & 0 deletions artiq/devices/ctlmgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import asyncio
import logging
import subprocess
import shlex
import socket

from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.protocols.logging import parse_log_message, log_with_name
from artiq.tools import Condition, TaskObject


logger = logging.getLogger(__name__)


class Controller:
def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1)

self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.term_timeout = ddb_entry.get("term_timeout", 30)

self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
self.process = None
self.launch_task = asyncio.Task(self.launcher())

async def end(self):
self.launch_task.cancel()
await asyncio.wait_for(self.launch_task, None)

async def call(self, method, *args, **kwargs):
remote = AsyncioClient()
await remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
r = await getattr(remote, method)(*args, **kwargs)
finally:
remote.close_rpc()
return r

async def _ping(self):
try:
ok = await asyncio.wait_for(self.call("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
return ok
except:
return False

async def _wait_and_ping(self):
while True:
try:
await asyncio.wait_for(self.process.wait(),
self.ping_timer)
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
ok = await self._ping()
if not ok:
logger.warning("Controller %s ping failed", self.name)
await self._terminate()
return
else:
break

async def forward_logs(self, stream):
source = "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)

async def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
self.name, self.command)
try:
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs(
self.process.stdout))
asyncio.ensure_future(self.forward_logs(
self.process.stderr))
await self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
await asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
await self._terminate()

async def _terminate(self):
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
try:
await asyncio.wait_for(self.call("terminate"),
self.term_timeout)
except:
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(self.process.wait(),
self.term_timeout)
except:
logger.warning("Controller %s failed to exit, killing",
self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
await self.process.wait()
logger.debug("Controller %s terminated", self.name)


def get_ip_addresses(host):
try:
addrinfo = socket.getaddrinfo(host, None)
except:
return set()
return {info[4][0] for info in addrinfo}


class Controllers:
def __init__(self):
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict()
self.process_task = asyncio.Task(self._process())

async def _process(self):
while True:
action, param = await self.queue.get()
if action == "set":
k, ddb_entry = param
if k in self.active:
await self.active[k].end()
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
await self.active[param].end()
del self.active[param]
self.queue.task_done()
if action not in ("set", "del"):
raise ValueError

def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller" and
self.host_filter in get_ip_addresses(v["host"])):
v["command"] = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, v)))
self.active_or_queued.add(k)

def __delitem__(self, k):
if k in self.active_or_queued:
self.queue.put_nowait(("del", k))
self.active_or_queued.remove(k)

def delete_all(self):
for name in set(self.active_or_queued):
del self[name]

async def shutdown(self):
self.process_task.cancel()
for c in self.active.values():
await c.end()


class ControllerDB:
def __init__(self):
self.current_controllers = Controllers()

def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter

def sync_struct_init(self, init):
if self.current_controllers is not None:
self.current_controllers.delete_all()
for k, v in init.items():
self.current_controllers[k] = v
return self.current_controllers


class ControllerManager(TaskObject):
def __init__(self, server, port, retry_master):
self.server = server
self.port = port
self.retry_master = retry_master
self.controller_db = ControllerDB()

async def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
await subscriber.connect(self.server, self.port,
set_host_filter)
try:
await asyncio.wait_for(subscriber.receive_task, None)
finally:
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
await asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
await self.controller_db.current_controllers.shutdown()

def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry
now."""
self.controller_db.current_controllers.active[k].retry_now.notify()
Loading