Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 47c0352505ef
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 171ed70f2aac
Choose a head ref
  • 4 commits
  • 9 files changed
  • 1 contributor

Commits on Oct 27, 2014

  1. pyon: add doc

    sbourdeauducq committed Oct 27, 2014
    Copy the full SHA
    27fc19e View commit details
  2. pc_rpc: document

    sbourdeauducq committed Oct 27, 2014
    Copy the full SHA
    934442b View commit details
  3. Copy the full SHA
    ea37274 View commit details
  4. Copy the full SHA
    171ed70 View commit details
Showing with 148 additions and 58 deletions.
  1. +0 −6 artiq/devices/pdq2/pdq2-client
  2. +4 −5 artiq/devices/pdq2/pdq2-controller
  3. +81 −24 artiq/management/pc_rpc.py
  4. +28 −0 artiq/management/pyon.py
  5. +1 −0 doc/manual/index.rst
  6. +14 −0 doc/manual/management_reference.rst
  7. +0 −6 frontend/artiq
  8. +3 −13 frontend/artiqd
  9. +17 −4 test/pc_rpc.py
6 changes: 0 additions & 6 deletions artiq/devices/pdq2/pdq2-client
Original file line number Diff line number Diff line change
@@ -19,9 +19,6 @@ def _get_args():
help="hostname or IP of the master to connect to")
parser.add_argument("--port", default=8889, type=int,
help="TCP port to use to connect to the master")
parser.add_argument("-q", "--quit-controller", default=False,
action="store_true",
help="causes the controller to quit")
parser.add_argument("-c", "--channel", default=0, type=int,
help="channel: 3*board_num+dac_num [%(default)s]")
parser.add_argument("-f", "--frame", default=0, type=int,
@@ -58,9 +55,6 @@ def _get_args():
def _main():
args = _get_args()
dev = Client(args.server, args.port)
if args.quit_controller:
dev.quit()
return
dev.init()

if args.reset:
9 changes: 4 additions & 5 deletions artiq/devices/pdq2/pdq2-controller
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import asyncio
from scipy import interpolate
import numpy as np

from artiq.management.pc_rpc import Server, WaitQuit
from artiq.management.pc_rpc import Server


logger = logging.getLogger("Pdq2")
@@ -83,7 +83,7 @@ if Ftdi is None:
Ftdi = FileFtdi


class Pdq2(WaitQuit):
class Pdq2:
"""
PDQ DAC (a.k.a. QC_Waveform)
"""
@@ -102,7 +102,6 @@ class Pdq2(WaitQuit):
}

def __init__(self, serial=None):
WaitQuit.__init__(self)
self.serial = serial
self.dev = Ftdi(serial)

@@ -363,9 +362,9 @@ def main():
server = Server(dev)
loop.run_until_complete(server.start(args.bind, args.port))
try:
loop.run_until_complete(dev.wait_quit())
loop.run_forever()
finally:
loop.run_until_complete(server.stop())
loop.run_until_complete(server.stop())
finally:
loop.close()
finally:
105 changes: 81 additions & 24 deletions artiq/management/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
"""
This module provides a remote procedure call (RPC) mechanism over sockets
between conventional computers (PCs) running Python. It strives to be
transparent and uses ``artiq.management.pyon`` internally so that e.g. Numpy
arrays can be easily used.
"""

import socket
import asyncio
import traceback
@@ -6,17 +14,50 @@


class RemoteError(Exception):
"""Exception raised when a RPC failed or raised an exception on the
remote (server) side.
"""
pass


class Client:
"""This class proxies the methods available on the server so that they
can be used as if they were local methods.
For example, if the server provides method ``foo``, and ``c`` is a local
``Client`` object, then the method can be called as: ::
result = c.foo(param1, param2)
The parameters and the result are automatically transferred with the
server.
Only methods are supported. Attributes must be accessed by providing and
using "get" and/or "set" methods on the server side.
At object initialization, the connection to the remote server is
automatically attempted. The user must call ``close_rpc`` to
free resources properly after initialization completes successfully.
:param host: Identifier of the server. The string can represent a
hostname or a IPv4 or IPv6 address (see
``socket.create_connection`` in the Python standard library).
:param port: TCP port to use.
"""
def __init__(self, host, port):
self.socket = socket.create_connection((host, port))

def close_rpc(self):
"""Closes the connection to the RPC server.
No further method calls should be done after this method is called.
"""
self.socket.close()

def do_rpc(self, name, args, kwargs):
def _do_rpc(self, name, args, kwargs):
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
line = pyon.encode(obj) + "\n"
self.socket.sendall(line.encode())
@@ -37,38 +78,66 @@ def do_rpc(self, name, args, kwargs):

def __getattr__(self, name):
def proxy(*args, **kwargs):
return self.do_rpc(name, args, kwargs)
return self._do_rpc(name, args, kwargs)
return proxy


class Server:
"""This class creates a TCP server that handles requests coming from
``Client`` objects.
The server is designed using ``asyncio`` so that it can easily support
multiple connections without the locking issues that arise in
multi-threaded applications. Multiple connection support is useful even in
simple cases: it allows new connections to be be accepted even when the
previous client failed to properly shut down its connection.
:param target: Object providing the RPC methods to be exposed to the
client.
"""
def __init__(self, target):
self.target = target
self.client_tasks = set()
self._client_tasks = set()

@asyncio.coroutine
def start(self, host, port):
self.server = yield from asyncio.start_server(self.handle_connection,
"""Starts the server.
The user must call ``stop`` to free resources properly after this
method completes successfully.
This method is a `coroutine`.
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
host, port)

@asyncio.coroutine
def stop(self):
for task in self.client_tasks:
"""Stops the server.
"""
for task in self._client_tasks:
task.cancel()
self.server.close()
yield from self.server.wait_closed()
del self.server

def client_done(self, task):
self.client_tasks.remove(task)
def _client_done(self, task):
self._client_tasks.remove(task)

def handle_connection(self, reader, writer):
task = asyncio.Task(self.handle_connection_task(reader, writer))
self.client_tasks.add(task)
task.add_done_callback(self.client_done)
def _handle_connection(self, reader, writer):
task = asyncio.Task(self._handle_connection_task(reader, writer))
self._client_tasks.add(task)
task.add_done_callback(self._client_done)

@asyncio.coroutine
def handle_connection_task(self, reader, writer):
def _handle_connection_task(self, reader, writer):
try:
while True:
line = yield from reader.readline()
@@ -89,15 +158,3 @@ def handle_connection_task(self, reader, writer):
writer.write(line.encode())
finally:
writer.close()


class WaitQuit:
def __init__(self):
self.terminate_notify = asyncio.Semaphore(0)

@asyncio.coroutine
def wait_quit(self):
yield from self.terminate_notify.acquire()

def quit(self):
self.terminate_notify.release()
28 changes: 28 additions & 0 deletions artiq/management/pyon.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
"""
This module provide serialization and deserialization functions for Python
objects. Its main features are:
* Human-readable format compatible with the Python syntax.
* Each object is serialized on a single line, with only ASCII characters.
* Supports all basic Python data structures: None, booleans, integers,
floats, strings, tuples, lists, dictionaries.
* Those data types are accurately reconstructed (unlike JSON where e.g. tuples
become lists, and dictionary keys are turned into strings).
* Supports Numpy arrays.
The main rationale for this new custom serializer (instead of using JSON) is
that JSON does not support Numpy and more generally cannot be extended with
other data types while keeping a concise syntax. Here we can use the Python
function call syntax to mark special data types.
"""


import base64

import numpy
@@ -69,6 +89,10 @@ def _encode_nparray(x):


def encode(x):
"""Serializes a Python object and returns the corresponding string in
Python syntax.
"""
return _encode_map[type(x)](x)


@@ -78,4 +102,8 @@ def _nparray(shape, dtype, data):


def decode(s):
"""Parses a string in the Python syntax, reconstructs the corresponding
object, and returns it.
"""
return eval(s, {"__builtins__": None, "nparray": _nparray}, {})
1 change: 1 addition & 0 deletions doc/manual/index.rst
Original file line number Diff line number Diff line change
@@ -10,3 +10,4 @@ Contents:
tutorial
core_reference
drivers_reference
management_reference
14 changes: 14 additions & 0 deletions doc/manual/management_reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Management reference
====================

:mod:`artiq.management.pyon` module
------------------------------------

.. automodule:: artiq.management.pyon
:members:

:mod:`artiq.management.pc_rpc` module
-------------------------------------

.. automodule:: artiq.management.pc_rpc
:members:
6 changes: 0 additions & 6 deletions frontend/artiq
Original file line number Diff line number Diff line change
@@ -17,10 +17,6 @@ def _get_args():
"-o", "--run-once", default=[], nargs=3,
action="append",
help="run experiment once. arguments: <path> <name> <timeout>")
parser.add_argument(
"-q", "--quit-master", default=False,
action="store_true",
help="causes the master to quit")
return parser.parse_args()


@@ -34,8 +30,6 @@ def main():
"path": path,
"name": name
}, int(timeout))
if args.quit_master:
remote.quit()
finally:
remote.close_rpc()

16 changes: 3 additions & 13 deletions frontend/artiqd
Original file line number Diff line number Diff line change
@@ -3,19 +3,10 @@
import asyncio
import argparse

from artiq.management.pc_rpc import Server, WaitQuit
from artiq.management.pc_rpc import Server
from artiq.management.scheduler import Scheduler


class Master(WaitQuit):
def __init__(self, scheduler):
WaitQuit.__init__(self)
self.scheduler = scheduler

def run_once(self, run_params, timeout):
self.scheduler.run_once(run_params, timeout)


def _get_args():
parser = argparse.ArgumentParser(description="PDQ2 controller")
parser.add_argument(
@@ -34,11 +25,10 @@ def main():
scheduler = Scheduler()
loop.run_until_complete(scheduler.start())
try:
master = Master(scheduler)
server = Server(master)
server = Server(scheduler)
loop.run_until_complete(server.start(args.bind, args.port))
try:
loop.run_until_complete(master.wait_quit())
loop.run_forever()
finally:
loop.run_until_complete(server.stop())
finally:
21 changes: 17 additions & 4 deletions test/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -22,8 +22,14 @@ def test_echo(self):
test_object = [5, 2.1, None, True, False,
{"a": 5, 2: np.linspace(0, 10, 1)},
(4, 5), (10,), "ab\nx\"'"]
time.sleep(.5) # wait for the server to start
remote = pc_rpc.Client(test_address, test_port)
for attempt in range(100):
time.sleep(.2)
try:
remote = pc_rpc.Client(test_address, test_port)
except ConnectionRefusedError:
pass
else:
break
try:
test_object_back = remote.echo(test_object)
with self.assertRaises(pc_rpc.RemoteError):
@@ -40,9 +46,16 @@ def test_echo(self):
self.assertEqual(test_object, test_object_back)


class Echo(pc_rpc.WaitQuit):
class Echo:
def __init__(self):
pc_rpc.WaitQuit.__init__(self)
self.terminate_notify = asyncio.Semaphore(0)

@asyncio.coroutine
def wait_quit(self):
yield from self.terminate_notify.acquire()

def quit(self):
self.terminate_notify.release()

def echo(self, x):
return x