Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 291ca0bf8d22
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 96a01efc48bd
Choose a head ref
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on Feb 7, 2015

  1. Copy the full SHA
    e7d85c5 View commit details
  2. Copy the full SHA
    96a01ef View commit details
Showing with 98 additions and 27 deletions.
  1. +98 −27 artiq/frontend/artiq_ctlmgr.py
125 changes: 98 additions & 27 deletions artiq/frontend/artiq_ctlmgr.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import argparse
import os
import logging
import signal

from artiq.protocols.sync_struct import Subscriber
from artiq.tools import verbosity_args, init_logger
@@ -24,75 +25,145 @@ def get_argparser():
parser.add_argument(
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
parser.add_argument(
"--retry-command", default=5.0, type=float,
help="retry timer for restarting a controller command")
return parser


class Controller:
def __init__(self, name, command, retry):
self.launch_task = asyncio.Task(self.launcher(name, command, retry))

@asyncio.coroutine
def end(self):
self.launch_task.cancel()
yield from asyncio.wait_for(self.launch_task, None)

@asyncio.coroutine
def launcher(self, name, command, retry):
process = None
try:
while True:
logger.info("Starting controller %s with command: %s",
name, command)
process = yield from asyncio.create_subprocess_exec(*command.split())
yield from asyncio.shield(process.wait())
logger.warning("Controller %s exited", name)
logger.warning("Restarting in %.1f seconds", retry)
yield from asyncio.sleep(retry)
except asyncio.CancelledError:
logger.info("Terminating controller %s", name)
if process is not None and process.returncode is None:
process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
name)
process.send_signal(signal.SIGKILL)


class Controllers:
def __init__(self):
def __init__(self, retry_command):
self.retry_command = retry_command
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict()
self.process_task = asyncio.Task(self._process())

@asyncio.coroutine
def _process(self):
while True:
action, param = yield from self.queue.get()
if action == "set":
k, command = param
if k in self.active:
yield from self.active[k].end()
self.active[k] = Controller(k, command, self.retry_command)
elif action == "del":
yield from self.active[param].end()
del self.active[param]
else:
raise ValueError

def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller"
and v["host"] == self.host_filter):
command = v["command"].format(bind=self.host_filter,
command = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
print("start controller {}: {}".format(k, command))
self.queue.put_nowait(("set", (k, command)))
self.active_or_queued.add(k)

def __delitem__(self, k):
print("del {}".format(k))
if k in self.active_or_queued:
self.queue.put_nowait(("del", k))
self.active_or_queued.remove(k)

def delete_all(self):
print("delete all")
for name in set(self.active_or_queued):
del self[name]


class ControllerDB:
def __init__(self):
self.current_controllers = Controllers()
def __init__(self, retry_command):
self.current_controllers = Controllers(retry_command)

def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter

def sync_struct_init(self, init):
if self.current_controllers is not None:
self.current_controllers.delete_all()
for k, v in init.items():
self.current_controllers[k] = v
return self.current_controllers


def main():
args = get_argparser().parse_args()
init_logger(args)

controller_db = ControllerDB()

if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
@asyncio.coroutine
def ctlmgr(server, port, retry_master, retry_command):
controller_db = ControllerDB(retry_command)
try:
subscriber = Subscriber("devices", controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
controller_db.current_controllers.host_filter = localhost
loop.run_until_complete(
subscriber.connect(args.server, args.port,
set_host_filter))
controller_db.set_host_filter(localhost)
yield from subscriber.connect(server, port, set_host_filter)
try:
loop.run_until_complete(subscriber.receive_task)
yield from asyncio.wait_for(subscriber.receive_task, None)
finally:
loop.run_until_complete(subscriber.close())
yield from subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", args.retry_master)
loop.run_until_complete(asyncio.sleep(args.retry_master))
logger.warning("Retrying in %.1f seconds", retry_master)
yield from asyncio.sleep(retry_master)
finally:
loop.close()
controller_db.current_controllers.delete_all()


def main():
args = get_argparser().parse_args()
init_logger(args)
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(ctlmgr(args.server, args.port,
args.retry_master, args.retry_command))
finally:
loop.close()

if __name__ == "__main__":
main()