Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 80805407bf0d
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 998db5121bf0
Choose a head ref
  • 2 commits
  • 5 files changed
  • 1 contributor

Commits on Aug 11, 2015

  1. Copy the full SHA
    a6ab066 View commit details
  2. Copy the full SHA
    998db51 View commit details
Showing with 92 additions and 63 deletions.
  1. +70 −43 artiq/frontend/artiq_ctlmgr.py
  2. +5 −5 artiq/frontend/artiq_rpctool.py
  3. +1 −1 artiq/frontend/pdq2_controller.py
  4. +14 −14 artiq/protocols/pc_rpc.py
  5. +2 −0 doc/manual/default_network_ports.rst
113 changes: 70 additions & 43 deletions artiq/frontend/artiq_ctlmgr.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/usr/bin/env python3

import asyncio
import atexit
import argparse
import os
import logging
import shlex
import socket

from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.protocols.pc_rpc import AsyncioClient, Server
from artiq.tools import verbosity_args, init_logger
from artiq.tools import asyncio_process_wait_timeout
from artiq.tools import TaskObject, asyncio_process_wait_timeout, Condition


logger = logging.getLogger(__name__)
@@ -29,8 +30,11 @@ def get_argparser():
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
parser.add_argument(
"--retry-command", default=5.0, type=float,
help="retry timer for restarting a controller command")
"--bind", default="::1",
help="hostname or IP address to bind to")
parser.add_argument(
"--bind-port", default=3249, type=int,
help="TCP port to listen to for control (default: %(default)d)")
return parser


@@ -48,6 +52,7 @@ def __init__(self, name, ddb_entry):
self.term_timeout = ddb_entry.get("term_timeout", 30)

self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
self.process = None
self.launch_task = asyncio.Task(self.launcher())

@@ -109,8 +114,13 @@ def launcher(self):
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds", self.retry_timer_cur)
yield from asyncio.sleep(self.retry_timer_cur)
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
yield from asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
yield from self._terminate()
@@ -208,34 +218,47 @@ def sync_struct_init(self, init):
return self.current_controllers


@asyncio.coroutine
def ctlmgr(server, port, retry_master):
controller_db = ControllerDB()
try:
subscriber = Subscriber("devices", controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
controller_db.set_host_filter(localhost)
yield from subscriber.connect(server, port, set_host_filter)
class ControllerManager(TaskObject):
def __init__(self, server, port, retry_master):
self.server = server
self.port = port
self.retry_master = retry_master
self.controller_db = ControllerDB()

@asyncio.coroutine
def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
while True:
try:
yield from asyncio.wait_for(subscriber.receive_task, None)
finally:
yield from subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", retry_master)
yield from asyncio.sleep(retry_master)
except asyncio.CancelledError:
pass
finally:
yield from controller_db.current_controllers.shutdown()
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
yield from subscriber.connect(self.server, self.port,
set_host_filter)
try:
yield from asyncio.wait_for(subscriber.receive_task, None)
finally:
yield from subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
yield from asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
yield from self.controller_db.current_controllers.shutdown()

def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry
now."""
self.controller_db.current_controllers.active[k].retry_now.notify()


def main():
@@ -247,18 +270,22 @@ def main():
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
atexit.register(lambda: loop.close())

try:
task = asyncio.Task(ctlmgr(
args.server, args.port, args.retry_master))
try:
loop.run_forever()
finally:
task.cancel()
loop.run_until_complete(asyncio.wait_for(task, None))
ctlmgr = ControllerManager(args.server, args.port, args.retry_master)
ctlmgr.start()
atexit.register(lambda: loop.run_until_complete(ctlmgr.stop()))

class CtlMgrRPC:
retry_now = ctlmgr.retry_now

rpc_target = CtlMgrRPC()
rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True)
loop.run_until_complete(rpc_server.start(args.bind, args.bind_port))
atexit.register(lambda: loop.run_until_complete(rpc_server.stop()))

loop.run_until_complete(rpc_server.wait_terminate())

finally:
loop.close()

if __name__ == "__main__":
main()
10 changes: 5 additions & 5 deletions artiq/frontend/artiq_rpctool.py
Original file line number Diff line number Diff line change
@@ -29,10 +29,10 @@ def get_argparser():
return parser


def list_targets(target_names, id_parameters):
def list_targets(target_names, description):
print("Target(s): " + ", ".join(target_names))
if id_parameters is not None:
print("Parameters: " + id_parameters)
if description is not None:
print("Description: " + description)


def list_methods(remote):
@@ -85,7 +85,7 @@ def main():

remote = Client(args.server, args.port, None)

targets, id_parameters = remote.get_rpc_id()
targets, description = remote.get_rpc_id()

if args.action != "list-targets":
# If no target specified and remote has only one, then use this one.
@@ -99,7 +99,7 @@ def main():
remote.select_rpc_target(args.target)

if args.action == "list-targets":
list_targets(targets, id_parameters)
list_targets(targets, description)
elif args.action == "list-methods":
list_methods(remote)
elif args.action == "call":
2 changes: 1 addition & 1 deletion artiq/frontend/pdq2_controller.py
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ def main():
dev = Pdq2(url=args.device, dev=port)
try:
simple_server_loop({"pdq2": dev}, args.bind, args.port,
id_parameters="device=" + str(args.device))
description="device=" + str(args.device))
finally:
dev.close()

28 changes: 14 additions & 14 deletions artiq/protocols/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ def __init__(self, host, port, target_name):

server_identification = self.__recv()
self.__target_names = server_identification["targets"]
self.__id_parameters = server_identification["parameters"]
self.__description = server_identification["description"]
if target_name is not None:
self.select_rpc_target(target_name)
except:
@@ -94,9 +94,9 @@ def select_rpc_target(self, target_name):
self.__socket.sendall((target_name + "\n").encode())

def get_rpc_id(self):
"""Returns a tuple (target_names, id_parameters) containing the
"""Returns a tuple (target_names, description) containing the
identification information of the server."""
return (self.__target_names, self.__id_parameters)
return (self.__target_names, self.__description)

def close_rpc(self):
"""Closes the connection to the RPC server.
@@ -157,7 +157,7 @@ def __init__(self):
self.__reader = None
self.__writer = None
self.__target_names = None
self.__id_parameters = None
self.__description = None

@asyncio.coroutine
def connect_rpc(self, host, port, target_name):
@@ -170,7 +170,7 @@ def connect_rpc(self, host, port, target_name):
self.__writer.write(_init_string)
server_identification = yield from self.__recv()
self.__target_names = server_identification["targets"]
self.__id_parameters = server_identification["parameters"]
self.__description = server_identification["description"]
if target_name is not None:
self.select_rpc_target(target_name)
except:
@@ -186,9 +186,9 @@ def select_rpc_target(self, target_name):
self.__writer.write((target_name + "\n").encode())

def get_rpc_id(self):
"""Returns a tuple (target_names, id_parameters) containing the
"""Returns a tuple (target_names, description) containing the
identification information of the server."""
return (self.__target_names, self.__id_parameters)
return (self.__target_names, self.__description)

def close_rpc(self):
"""Closes the connection to the RPC server.
@@ -199,7 +199,7 @@ def close_rpc(self):
self.__reader = None
self.__writer = None
self.__target_names = None
self.__id_parameters = None
self.__description = None

def __send(self, obj):
line = pyon.encode(obj) + "\n"
@@ -398,17 +398,17 @@ class Server(_AsyncioServer):
:param targets: A dictionary of objects providing the RPC methods to be
exposed to the client. Keys are names identifying each object.
Clients select one of these objects using its name upon connection.
:param id_parameters: An optional human-readable string giving more
:param description: An optional human-readable string giving more
information about the server.
:param builtin_terminate: If set, the server provides a built-in
``terminate`` method that unblocks any tasks waiting on
``wait_terminate``. This is useful to handle server termination
requests from clients.
"""
def __init__(self, targets, id_parameters=None, builtin_terminate=False):
def __init__(self, targets, description=None, builtin_terminate=False):
_AsyncioServer.__init__(self)
self.targets = targets
self.id_parameters = id_parameters
self.description = description
self.builtin_terminate = builtin_terminate
if builtin_terminate:
self._terminate_request = asyncio.Event()
@@ -422,7 +422,7 @@ def _handle_connection_cr(self, reader, writer):

obj = {
"targets": sorted(self.targets.keys()),
"parameters": self.id_parameters
"description": self.description
}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
@@ -480,15 +480,15 @@ def wait_terminate(self):
yield from self._terminate_request.wait()


def simple_server_loop(targets, host, port, id_parameters=None):
def simple_server_loop(targets, host, port, description=None):
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C)
or termination is requested by a client.
See ``Server`` for a description of the parameters.
"""
loop = asyncio.get_event_loop()
try:
server = Server(targets, id_parameters, True)
server = Server(targets, description, True)
loop.run_until_complete(server.start(host, port))
try:
loop.run_until_complete(server.wait_terminate())
2 changes: 2 additions & 0 deletions doc/manual/default_network_ports.rst
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ Default network ports
+--------------------------+--------------+
| Core device (mon/inj) | 3250 (UDP) |
+--------------------------+--------------+
| Controller manager | 3249 |
+--------------------------+--------------+
| Master (notifications) | 3250 |
+--------------------------+--------------+
| Master (control) | 3251 |