Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3f68d0ba8fbc
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 54b11a392a17
Choose a head ref
  • 3 commits
  • 2 files changed
  • 1 contributor

Commits on Aug 9, 2015

  1. Copy the full SHA
    a21049d View commit details
  2. Copy the full SHA
    4b19566 View commit details
  3. Copy the full SHA
    54b11a3 View commit details
Showing with 56 additions and 23 deletions.
  1. +30 −14 artiq/frontend/artiq_ctlmgr.py
  2. +26 −9 artiq/protocols/pc_rpc.py
44 changes: 30 additions & 14 deletions artiq/frontend/artiq_ctlmgr.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
import argparse
import os
import logging
import signal
import shlex
import socket

@@ -40,12 +39,15 @@ def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1)

self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.term_timeout = ddb_entry.get("term_timeout", 30)

self.retry_timer_cur = self.retry_timer
self.process = None
self.launch_task = asyncio.Task(self.launcher())

@@ -55,22 +57,25 @@ def end(self):
yield from asyncio.wait_for(self.launch_task, None)

@asyncio.coroutine
def _ping_notimeout(self):
def _call_controller(self, method):
remote = AsyncioClient()
yield from remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
ok = yield from remote.ping()
r = yield from getattr(remote, method)()
finally:
remote.close_rpc()
return ok
return r

@asyncio.coroutine
def _ping(self):
try:
return (yield from asyncio.wait_for(
self._ping_notimeout(), self.ping_timeout))
ok = yield from asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
return ok
except:
return False

@@ -87,6 +92,8 @@ def _wait_and_ping(self):
logger.warning("Controller %s ping failed", self.name)
yield from self._terminate()
return
else:
break

@asyncio.coroutine
def launcher(self):
@@ -102,23 +109,32 @@ def launcher(self):
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds", self.retry_timer)
yield from asyncio.sleep(self.retry_timer)
logger.warning("Restarting in %.1f seconds", self.retry_timer_cur)
yield from asyncio.sleep(self.retry_timer_cur)
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
yield from self._terminate()

@asyncio.coroutine
def _terminate(self):
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
self.process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio_process_wait_timeout(self.process, 5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
yield from asyncio.wait_for(self._call_controller("terminate"),
self.term_timeout)
except:
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
self.process.kill()
try:
yield from asyncio_process_wait_timeout(self.process,
self.term_timeout)
except:
logger.warning("Controller %s failed to exit, killing",
self.name)
self.process.send_signal(signal.SIGKILL)
self.process.kill()
yield from self.process.wait()
logger.debug("Controller %s terminated", self.name)


def get_ip_addresses(host):
35 changes: 26 additions & 9 deletions artiq/protocols/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -240,7 +240,8 @@ class BestEffortClient:
network errors are suppressed and connections are retried in the
background.
RPC calls that failed because of network errors return ``None``.
RPC calls that failed because of network errors return ``None``. Other RPC
calls are blocking and return the correct value.
:param firstcon_timeout: Timeout to use during the first (blocking)
connection attempt at object initialization.
@@ -397,12 +398,19 @@ class Server(_AsyncioServer):
exposed to the client. Keys are names identifying each object.
Clients select one of these objects using its name upon connection.
:param id_parameters: An optional human-readable string giving more
information about the parameters of the server.
information about the server.
:param builtin_terminate: If set, the server provides a built-in
``terminate`` method that unblocks any tasks waiting on
``wait_terminate``. This is useful to handle server termination
requests from clients.
"""
def __init__(self, targets, id_parameters=None):
def __init__(self, targets, id_parameters=None, builtin_terminate=False):
_AsyncioServer.__init__(self)
self.targets = targets
self.id_parameters = id_parameters
self.builtin_terminate = builtin_terminate
if builtin_terminate:
self._terminate_request = asyncio.Event()

@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
@@ -448,9 +456,13 @@ def _handle_connection_cr(self, reader, writer):
obj = {"status": "ok", "ret": doc}
elif obj["action"] == "call":
logger.debug("calling %s", _PrettyPrintCall(obj))
method = getattr(target, obj["name"])
ret = method(*obj["args"], **obj["kwargs"])
obj = {"status": "ok", "ret": ret}
if self.builtin_terminate and obj["name"] == "terminate":
self._terminate_request.set()
obj = {"status": "ok", "ret": None}
else:
method = getattr(target, obj["name"])
ret = method(*obj["args"], **obj["kwargs"])
obj = {"status": "ok", "ret": ret}
else:
raise ValueError("Unknown action: {}"
.format(obj["action"]))
@@ -462,18 +474,23 @@ def _handle_connection_cr(self, reader, writer):
finally:
writer.close()

@asyncio.coroutine
def wait_terminate(self):
yield from self._terminate_request.wait()


def simple_server_loop(targets, host, port, id_parameters=None):
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C).
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C)
or termination is requested by a client.
See ``Server`` for a description of the parameters.
"""
loop = asyncio.get_event_loop()
try:
server = Server(targets, id_parameters)
server = Server(targets, id_parameters, True)
loop.run_until_complete(server.start(host, port))
try:
loop.run_forever()
loop.run_until_complete(server.wait_terminate())
finally:
loop.run_until_complete(server.stop())
finally: