Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 650baa9fc1cc
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 7354117af251
Choose a head ref
  • 2 commits
  • 2 files changed
  • 1 contributor

Commits on Jan 5, 2015

  1. pc_rpc: asyncio client

    sbourdeauducq committed Jan 5, 2015
    Copy the full SHA
    f9dd568 View commit details
  2. Copy the full SHA
    7354117 View commit details
Showing with 181 additions and 46 deletions.
  1. +125 −26 artiq/management/pc_rpc.py
  2. +56 −20 artiq/test/pc_rpc.py
151 changes: 125 additions & 26 deletions artiq/management/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
import traceback

from artiq.management import pyon
from artiq.management.tools import AsyncioServer
from artiq.management.tools import AsyncioServer as _AsyncioServer


class RemoteError(Exception):
@@ -70,59 +70,62 @@ class Client:
"""
def __init__(self, host, port, target_name):
self._socket = socket.create_connection((host, port))
self._socket.sendall(_init_string)
self.__socket = socket.create_connection((host, port))

server_identification = self._recv()
self._target_names = server_identification["targets"]
self._id_parameters = server_identification["parameters"]
if target_name is not None:
self.select_rpc_target(target_name)
try:
self.__socket.sendall(_init_string)

server_identification = self.__recv()
self.__target_names = server_identification["targets"]
self.__id_parameters = server_identification["parameters"]
if target_name is not None:
self.select_rpc_target(target_name)
except:
self.__socket.close()
raise

def select_rpc_target(self, target_name):
"""Selects a RPC target by name. This function should be called
exactly once if the object was created with ``target_name=None``.
"""
if target_name not in self._target_names:
if target_name not in self.__target_names:
raise IncompatibleServer
self._socket.sendall((target_name + "\n").encode())
self.__socket.sendall((target_name + "\n").encode())

def get_rpc_id(self):
"""Returns a tuple (target_names, id_parameters) containing the
identification information of the server.
"""
return (self._target_names, self._id_parameters)
return (self.__target_names, self.__id_parameters)

def close_rpc(self):
"""Closes the connection to the RPC server.
No further method calls should be done after this method is called.
"""
self._socket.close()
self.__socket.close()

def _send(self, obj):
def __send(self, obj):
line = pyon.encode(obj) + "\n"
self._socket.sendall(line.encode())
self.__socket.sendall(line.encode())

def _recv(self):
buf = self._socket.recv(4096).decode()
def __recv(self):
buf = self.__socket.recv(4096).decode()
while "\n" not in buf:
more = self._socket.recv(4096)
more = self.__socket.recv(4096)
if not more:
break
buf += more.decode()
obj = pyon.decode(buf)

return obj
return pyon.decode(buf)

def _do_rpc(self, name, args, kwargs):
def __do_rpc(self, name, args, kwargs):
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
self._send(obj)
self.__send(obj)

obj = self._recv()
obj = self.__recv()
if obj["status"] == "ok":
return obj["ret"]
elif obj["status"] == "failed":
@@ -132,11 +135,107 @@ def _do_rpc(self, name, args, kwargs):

def __getattr__(self, name):
def proxy(*args, **kwargs):
return self._do_rpc(name, args, kwargs)
return self.__do_rpc(name, args, kwargs)
return proxy


class AsyncioClient:
"""This class is similar to :class:`artiq.management.pc_rpc.Client`, but
uses ``asyncio`` instead of blocking calls.
All RPC methods are coroutines.
"""
def __init__(self):
self.__lock = asyncio.Lock()
self.__reader = None
self.__writer = None
self.__target_names = None
self.__id_parameters = None

@asyncio.coroutine
def connect_rpc(self, host, port, target_name):
"""Connects to the server. This cannot be done in __init__ because
this method is a coroutine. See ``Client`` for a description of the
parameters.
"""
self.__reader, self.__writer = \
yield from asyncio.open_connection(host, port)
try:
self.__writer.write(_init_string)
server_identification = yield from self.__recv()
self.__target_names = server_identification["targets"]
self.__id_parameters = server_identification["parameters"]
if target_name is not None:
self.select_rpc_target(target_name)
except:
self.close_rpc()
raise

def select_rpc_target(self, target_name):
"""Selects a RPC target by name. This function should be called
exactly once if the connection was created with ``target_name=None``.
"""
if target_name not in self.__target_names:
raise IncompatibleServer
self.__writer.write((target_name + "\n").encode())

def get_rpc_id(self):
"""Returns a tuple (target_names, id_parameters) containing the
identification information of the server.
"""
return (self.__target_names, self.__id_parameters)

def close_rpc(self):
"""Closes the connection to the RPC server.
No further method calls should be done after this method is called.
"""
self.__writer.close()
self.__reader = None
self.__writer = None
self.__target_names = None
self.__id_parameters = None

def __send(self, obj):
line = pyon.encode(obj) + "\n"
self.__writer.write(line.encode())

@asyncio.coroutine
def __recv(self):
line = yield from self.__reader.readline()
return pyon.decode(line.decode())

@asyncio.coroutine
def __do_rpc(self, name, args, kwargs):
yield from self.__lock.acquire()
try:
obj = {"action": "call", "name": name,
"args": args, "kwargs": kwargs}
self.__send(obj)

obj = yield from self.__recv()
if obj["status"] == "ok":
return obj["ret"]
elif obj["status"] == "failed":
raise RemoteError(obj["message"])
else:
raise ValueError
finally:
self.__lock.release()

def __getattr__(self, name):
@asyncio.coroutine
def proxy(*args, **kwargs):
return self.__do_rpc(name, args, kwargs)
return proxy


class Server(AsyncioServer):
class Server(_AsyncioServer):
"""This class creates a TCP server that handles requests coming from
``Client`` objects.
@@ -154,7 +253,7 @@ class Server(AsyncioServer):
"""
def __init__(self, targets, id_parameters=None):
AsyncioServer.__init__(self)
_AsyncioServer.__init__(self)
self.targets = targets
self.id_parameters = id_parameters

76 changes: 56 additions & 20 deletions artiq/test/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -11,40 +11,76 @@

test_address = "::1"
test_port = 7777
test_object = [5, 2.1, None, True, False,
{"a": 5, 2: np.linspace(0, 10, 1)},
(4, 5), (10,), "ab\nx\"'"]


class RPCCase(unittest.TestCase):
def test_echo(self):
def _run_server_and_test(self, test):
# running this file outside of unittest starts the echo server
with subprocess.Popen([sys.executable,
sys.modules[__name__].__file__]) as proc:
try:
test_object = [5, 2.1, None, True, False,
{"a": 5, 2: np.linspace(0, 10, 1)},
(4, 5), (10,), "ab\nx\"'"]
for attempt in range(100):
time.sleep(.2)
try:
remote = pc_rpc.Client(test_address, test_port,
"test")
except ConnectionRefusedError:
pass
else:
break
try:
test_object_back = remote.echo(test_object)
with self.assertRaises(pc_rpc.RemoteError):
remote.non_existing_method()
remote.quit()
finally:
remote.close_rpc()
test()
finally:
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
raise

def _blocking_echo(self):
for attempt in range(100):
time.sleep(.2)
try:
remote = pc_rpc.Client(test_address, test_port,
"test")
except ConnectionRefusedError:
pass
else:
break
try:
test_object_back = remote.echo(test_object)
self.assertEqual(test_object, test_object_back)
with self.assertRaises(pc_rpc.RemoteError):
remote.non_existing_method()
remote.quit()
finally:
remote.close_rpc()

def test_blocking_echo(self):
self._run_server_and_test(self._blocking_echo)

@asyncio.coroutine
def _asyncio_echo(self):
remote = pc_rpc.AsyncioClient()
for attempt in range(100):
yield from asyncio.sleep(.2)
try:
yield from remote.connect_rpc(test_address, test_port, "test")
except ConnectionRefusedError:
pass
else:
break
try:
test_object_back = yield from remote.echo(test_object)
self.assertEqual(test_object, test_object_back)
with self.assertRaises(pc_rpc.RemoteError):
yield from remote.non_existing_method()
yield from remote.quit()
finally:
remote.close_rpc()

def _loop_asyncio_echo(self):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._asyncio_echo())
finally:
loop.close()

def test_asyncio_echo(self):
self._run_server_and_test(self._loop_asyncio_echo)


class Echo: