Skip to content

Commit d2a5dfa

Browse files
committedDec 31, 2014
pc_rpc: multiple target support
1 parent 9cd89a0 commit d2a5dfa

10 files changed

+113
-92
lines changed
 

‎artiq/management/pc_rpc.py

+71-54
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class RemoteError(Exception):
3030

3131
class IncompatibleServer(Exception):
3232
"""Raised by the client when attempting to connect to a server that does
33-
not have the expected type.
33+
not have the expected target.
3434
3535
"""
3636
pass
@@ -62,59 +62,71 @@ class Client:
6262
hostname or a IPv4 or IPv6 address (see
6363
``socket.create_connection`` in the Python standard library).
6464
:param port: TCP port to use.
65-
:param expected_id_type: Server type to expect. ``IncompatibleServer`` is
66-
raised when the types do not match. Use ``None`` to accept any server
67-
type.
65+
:param target_name: Target name to select. ``IncompatibleServer`` is
66+
raised if the target does not exist.
67+
Use ``None`` to skip selecting a target. The list of targets can then
68+
be retrieved using ``get_rpc_id`` and then one can be selected later
69+
using ``select_rpc_target``.
6870
6971
"""
70-
def __init__(self, host, port, expected_id_type):
71-
self.socket = socket.create_connection((host, port))
72-
self.socket.sendall(_init_string)
73-
self._identify(expected_id_type)
72+
def __init__(self, host, port, target_name):
73+
self._socket = socket.create_connection((host, port))
74+
self._socket.sendall(_init_string)
75+
76+
server_identification = self._recv()
77+
self._target_names = server_identification["targets"]
78+
self._id_parameters = server_identification["parameters"]
79+
if target_name is not None:
80+
self.select_rpc_target(target_name)
81+
82+
def select_rpc_target(self, target_name):
83+
"""Selects a RPC target by name. This function should be called
84+
exactly once if the object was created with ``target_name=None``.
85+
86+
"""
87+
if target_name not in self._target_names:
88+
raise IncompatibleServer
89+
self._socket.sendall((target_name + "\n").encode())
7490

7591
def get_rpc_id(self):
76-
"""Returns a dictionary containing the identification information of
77-
the server.
92+
"""Returns a tuple (target_names, id_parameters) containing the
93+
identification information of the server.
7894
7995
"""
80-
return self._server_identification
96+
return (self._target_names, self._id_parameters)
8197

8298
def close_rpc(self):
8399
"""Closes the connection to the RPC server.
84100
85101
No further method calls should be done after this method is called.
86102
87103
"""
88-
self.socket.close()
104+
self._socket.close()
89105

90-
def _send_recv(self, obj):
106+
def _send(self, obj):
91107
line = pyon.encode(obj) + "\n"
92-
self.socket.sendall(line.encode())
108+
self._socket.sendall(line.encode())
93109

94-
buf = self.socket.recv(4096).decode()
110+
def _recv(self):
111+
buf = self._socket.recv(4096).decode()
95112
while "\n" not in buf:
96-
more = self.socket.recv(4096)
113+
more = self._socket.recv(4096)
97114
if not more:
98115
break
99116
buf += more.decode()
100117
obj = pyon.decode(buf)
101118

102119
return obj
103120

104-
def _identify(self, expected_id_type):
105-
obj = {"action": "identify"}
106-
self._server_identification = self._send_recv(obj)
107-
if (expected_id_type is not None
108-
and self._server_identification["type"] != expected_id_type):
109-
raise IncompatibleServer
110-
111121
def _do_rpc(self, name, args, kwargs):
112122
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
113-
obj = self._send_recv(obj)
114-
if obj["result"] == "ok":
123+
self._send(obj)
124+
125+
obj = self._recv()
126+
if obj["status"] == "ok":
115127
return obj["ret"]
116-
elif obj["result"] == "error":
117-
raise RemoteError(obj["message"] + "\n" + obj["traceback"])
128+
elif obj["status"] == "failed":
129+
raise RemoteError(obj["message"])
118130
else:
119131
raise ValueError
120132

@@ -134,18 +146,16 @@ class Server(AsyncioServer):
134146
simple cases: it allows new connections to be be accepted even when the
135147
previous client failed to properly shut down its connection.
136148
137-
:param target: Object providing the RPC methods to be exposed to the
138-
client.
139-
:param id_type: A string identifying the server type. Clients use it to
140-
verify that they are connected to the proper server.
149+
:param targets: A dictionary of objects providing the RPC methods to be
150+
exposed to the client. Keys are names identifying each object.
151+
Clients select one of these objects using its name upon connection.
141152
:param id_parameters: An optional human-readable string giving more
142153
information about the parameters of the server.
143154
144155
"""
145-
def __init__(self, target, id_type, id_parameters=None):
156+
def __init__(self, targets, id_parameters=None):
146157
AsyncioServer.__init__(self)
147-
self.target = target
148-
self.id_type = id_type
158+
self.targets = targets
149159
self.id_parameters = id_parameters
150160

151161
@asyncio.coroutine
@@ -154,42 +164,49 @@ def _handle_connection_cr(self, reader, writer):
154164
line = yield from reader.readline()
155165
if line != _init_string:
156166
return
167+
168+
obj = {
169+
"targets": sorted(self.targets.keys()),
170+
"parameters": self.id_parameters
171+
}
172+
line = pyon.encode(obj) + "\n"
173+
writer.write(line.encode())
174+
line = yield from reader.readline()
175+
if not line:
176+
return
177+
target_name = line.decode()[:-1]
178+
try:
179+
target = self.targets[target_name]
180+
except KeyError:
181+
return
182+
157183
while True:
158184
line = yield from reader.readline()
159185
if not line:
160186
break
161187
obj = pyon.decode(line.decode())
162-
action = obj["action"]
163-
if action == "call":
164-
try:
165-
method = getattr(self.target, obj["name"])
166-
ret = method(*obj["args"], **obj["kwargs"])
167-
obj = {"result": "ok", "ret": ret}
168-
except Exception as e:
169-
obj = {"result": "error",
170-
"message": type(e).__name__ + ": " + str(e),
171-
"traceback": traceback.format_exc()}
172-
line = pyon.encode(obj) + "\n"
173-
writer.write(line.encode())
174-
elif action == "identify":
175-
obj = {"type": self.id_type}
176-
if self.id_parameters is not None:
177-
obj["parameters"] = self.id_parameters
178-
line = pyon.encode(obj) + "\n"
179-
writer.write(line.encode())
188+
try:
189+
method = getattr(target, obj["name"])
190+
ret = method(*obj["args"], **obj["kwargs"])
191+
obj = {"status": "ok", "ret": ret}
192+
except Exception:
193+
obj = {"status": "failed",
194+
"message": traceback.format_exc()}
195+
line = pyon.encode(obj) + "\n"
196+
writer.write(line.encode())
180197
finally:
181198
writer.close()
182199

183200

184-
def simple_server_loop(target, id_type, host, port, id_parameters=None):
201+
def simple_server_loop(targets, host, port, id_parameters=None):
185202
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C).
186203
187204
See ``Server`` for a description of the parameters.
188205
189206
"""
190207
loop = asyncio.get_event_loop()
191208
try:
192-
server = Server(target, id_type, id_parameters)
209+
server = Server(targets, id_parameters)
193210
loop.run_until_complete(server.start(host, port))
194211
try:
195212
loop.run_forever()

‎artiq/test/pc_rpc.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def run_server():
6666
loop = asyncio.get_event_loop()
6767
try:
6868
echo = Echo()
69-
server = pc_rpc.Server(echo, "test")
69+
server = pc_rpc.Server({"test": echo})
7070
loop.run_until_complete(server.start(test_address, test_port))
7171
try:
7272
loop.run_until_complete(echo.wait_quit())

‎doc/manual/drivers_reference.rst

+11-9
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ Default TCP port list
3030

3131
When writing a new driver, choose a free TCP port and add it to this list.
3232

33-
+-----------+--------------+
34-
| Component | Default port |
35-
+===========+==============+
36-
| Master | 8888 |
37-
+-----------+--------------+
38-
| PDQ2 | 8889 |
39-
+-----------+--------------+
40-
| LDA | 8890 |
41-
+-----------+--------------+
33+
+--------------------------+--------------+
34+
| Component | Default port |
35+
+==========================+==============+
36+
| Master (notifications) | 8887 |
37+
+--------------------------+--------------+
38+
| Master (control) | 8888 |
39+
+--------------------------+--------------+
40+
| PDQ2 | 8889 |
41+
+--------------------------+--------------+
42+
| LDA | 8890 |
43+
+--------------------------+--------------+

‎doc/manual/writing_a_driver.rst

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ To turn it into a server, we use :class:`artiq.management.pc_rpc`. Import the fu
2323
and add a ``main`` function that is run when the program is executed: ::
2424

2525
def main():
26-
simple_server_loop(Hello(), "hello", "::1", 7777)
26+
simple_server_loop({"hello": Hello()}, "::1", 7777)
2727

2828
if __name__ == "__main__":
2929
main()
@@ -49,10 +49,10 @@ and verify that you can connect to the TCP port: ::
4949

5050
:tip: Use the key combination Ctrl-AltGr-9 to get the ``telnet>`` prompt, and enter ``close`` to quit Telnet. Quit the controller with Ctrl-C.
5151

52-
Also verify that you can get the type of the server (the "hello" string passed to ``simple_server_loop``) using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: ::
52+
Also verify that a target (service) named "hello" (as passed in the first argument to ``simple_server_loop``) exists using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: ::
5353

5454
$ artiq_ctlid.py ::1 7777
55-
Type: hello
55+
Target(s): hello
5656

5757
The client
5858
----------

‎frontend/artiq_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def init_periodic(x):
144144
_run_subscriber(args.server, args.port, subscriber)
145145
else:
146146
port = 8888 if args.port is None else args.port
147-
remote = Client(args.server, port, "schedule_control")
147+
remote = Client(args.server, port, "master_schedule")
148148
try:
149149
globals()["_action_" + args.action](remote, args)
150150
finally:

‎frontend/artiq_ctlid.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ def main():
1919
args = _get_args()
2020
remote = Client(args.server, args.port, None)
2121
try:
22-
ident = remote.get_rpc_id()
22+
target_names, id_parameters = remote.get_rpc_id()
2323
finally:
2424
remote.close_rpc()
25-
print("Type: " + ident["type"])
26-
if "parameters" in ident:
27-
print("Parameters: " + ident["parameters"])
25+
print("Target(s): " + ", ".join(target_names))
26+
if id_parameters is not None:
27+
print("Parameters: " + id_parameters)
2828

2929
if __name__ == "__main__":
3030
main()

‎frontend/artiq_gui.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ def _get_args():
1616
"-s", "--server", default="::1",
1717
help="hostname or IP of the master to connect to")
1818
parser.add_argument(
19-
"--port-schedule-control", default=8888, type=int,
20-
help="TCP port to connect to for schedule control")
19+
"--port-notify", default=8887, type=int,
20+
help="TCP port to connect to for notifications")
2121
parser.add_argument(
22-
"--port-schedule-notify", default=8887, type=int,
23-
help="TCP port to connect to for schedule notifications")
22+
"--port-control", default=8888, type=int,
23+
help="TCP port to connect to for control")
2424
return parser.parse_args()
2525

2626

@@ -39,7 +39,7 @@ def main():
3939
parameters_win.show_all()
4040

4141
loop.run_until_complete(scheduler_win.sub_connect(
42-
args.server, args.port_schedule_notify))
42+
args.server, args.port_notify))
4343
try:
4444
loop.run_forever()
4545
finally:

‎frontend/artiq_master.py

+14-12
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ def _get_args():
1616
"--bind", default="::1",
1717
help="hostname or IP address to bind to")
1818
parser.add_argument(
19-
"--port-schedule-control", default=8888, type=int,
20-
help="TCP port to listen to for schedule control")
19+
"--port-notify", default=8887, type=int,
20+
help="TCP port to listen to for notifications")
2121
parser.add_argument(
22-
"--port-schedule-notify", default=8887, type=int,
23-
help="TCP port to listen to for schedule notifications")
22+
"--port-control", default=8888, type=int,
23+
help="TCP port to listen to for control")
2424
return parser.parse_args()
2525

2626

@@ -38,18 +38,20 @@ def main():
3838
loop.run_until_complete(scheduler.start())
3939
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
4040

41-
schedule_control = Server(scheduler, "schedule_control")
42-
loop.run_until_complete(schedule_control.start(
43-
args.bind, args.port_schedule_control))
44-
atexit.register(lambda: loop.run_until_complete(schedule_control.stop()))
41+
server_control = Server({
42+
"master_schedule": scheduler
43+
})
44+
loop.run_until_complete(server_control.start(
45+
args.bind, args.port_control))
46+
atexit.register(lambda: loop.run_until_complete(server_control.stop()))
4547

46-
schedule_notify = Publisher({
48+
server_notify = Publisher({
4749
"queue": scheduler.queue,
4850
"periodic": scheduler.periodic
4951
})
50-
loop.run_until_complete(schedule_notify.start(
51-
args.bind, args.port_schedule_notify))
52-
atexit.register(lambda: loop.run_until_complete(schedule_notify.stop()))
52+
loop.run_until_complete(server_notify.start(
53+
args.bind, args.port_notify))
54+
atexit.register(lambda: loop.run_until_complete(server_notify.stop()))
5355

5456
loop.run_forever()
5557

‎frontend/lda_controller.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def main():
2323
else:
2424
lda = Lda(args.serial, args.device)
2525

26-
simple_server_loop(lda, "lda",
26+
simple_server_loop({"lda": lda},
2727
args.bind, args.port)
2828

2929
if __name__ == "__main__":

‎frontend/pdq2_controller.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ def main():
3232

3333
dev = Pdq2(serial=args.serial)
3434
try:
35-
simple_server_loop(dev, "pdq2", args.bind, args.port,
36-
id_parameters="serial="+str(args.serial))
35+
simple_server_loop({"pdq2": dev}, args.bind, args.port,
36+
id_parameters="serial=" + str(args.serial))
3737
finally:
3838
dev.close()
3939

0 commit comments

Comments
 (0)
Please sign in to comment.