Skip to content

Commit f033810

Browse files
committedDec 27, 2014
management: add sync_struct
1 parent f7232fd commit f033810

File tree

4 files changed

+191
-39
lines changed

4 files changed

+191
-39
lines changed
 

‎artiq/management/network.py

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import asyncio
2+
from copy import copy
3+
4+
5+
class AsyncioServer:
6+
"""Generic TCP server based on asyncio.
7+
8+
Users of this class must derive from it and define the
9+
``_handle_connection_cr`` method and coroutine.
10+
11+
"""
12+
def __init__(self):
13+
self._client_tasks = set()
14+
15+
@asyncio.coroutine
16+
def start(self, host, port):
17+
"""Starts the server.
18+
19+
The user must call ``stop`` to free resources properly after this
20+
method completes successfully.
21+
22+
This method is a `coroutine`.
23+
24+
:param host: Bind address of the server (see ``asyncio.start_server``
25+
from the Python standard library).
26+
:param port: TCP port to bind to.
27+
28+
"""
29+
self.server = yield from asyncio.start_server(self._handle_connection,
30+
host, port)
31+
32+
@asyncio.coroutine
33+
def stop(self):
34+
"""Stops the server.
35+
36+
"""
37+
wait_for = copy(self._client_tasks)
38+
for task in self._client_tasks:
39+
task.cancel()
40+
for task in wait_for:
41+
try:
42+
yield from asyncio.wait_for(task, None)
43+
except asyncio.CancelledError:
44+
pass
45+
self.server.close()
46+
yield from self.server.wait_closed()
47+
del self.server
48+
49+
def _client_done(self, task):
50+
self._client_tasks.remove(task)
51+
52+
def _handle_connection(self, reader, writer):
53+
task = asyncio.Task(self._handle_connection_cr(reader, writer))
54+
self._client_tasks.add(task)
55+
task.add_done_callback(self._client_done)

‎artiq/management/pc_rpc.py

+4-39
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import traceback
1818

1919
from artiq.management import pyon
20+
from artiq.management.network import AsyncioServer
2021

2122

2223
class RemoteError(Exception):
@@ -123,7 +124,7 @@ def proxy(*args, **kwargs):
123124
return proxy
124125

125126

126-
class Server:
127+
class Server(AsyncioServer):
127128
"""This class creates a TCP server that handles requests coming from
128129
``Client`` objects.
129130
@@ -142,49 +143,13 @@ class Server:
142143
143144
"""
144145
def __init__(self, target, id_type, id_parameters=None):
146+
AsyncioServer.__init__(self)
145147
self.target = target
146148
self.id_type = id_type
147149
self.id_parameters = id_parameters
148-
self._client_tasks = set()
149150

150151
@asyncio.coroutine
151-
def start(self, host, port):
152-
"""Starts the server.
153-
154-
The user must call ``stop`` to free resources properly after this
155-
method completes successfully.
156-
157-
This method is a `coroutine`.
158-
159-
:param host: Bind address of the server (see ``asyncio.start_server``
160-
from the Python standard library).
161-
:param port: TCP port to bind to.
162-
163-
"""
164-
self.server = yield from asyncio.start_server(self._handle_connection,
165-
host, port)
166-
167-
@asyncio.coroutine
168-
def stop(self):
169-
"""Stops the server.
170-
171-
"""
172-
for task in self._client_tasks:
173-
task.cancel()
174-
self.server.close()
175-
yield from self.server.wait_closed()
176-
del self.server
177-
178-
def _client_done(self, task):
179-
self._client_tasks.remove(task)
180-
181-
def _handle_connection(self, reader, writer):
182-
task = asyncio.Task(self._handle_connection_task(reader, writer))
183-
self._client_tasks.add(task)
184-
task.add_done_callback(self._client_done)
185-
186-
@asyncio.coroutine
187-
def _handle_connection_task(self, reader, writer):
152+
def _handle_connection_cr(self, reader, writer):
188153
try:
189154
line = yield from reader.readline()
190155
if line != _init_string:

‎artiq/management/sync_struct.py

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import asyncio
2+
3+
from artiq.management import pyon
4+
from artiq.management.network import AsyncioServer
5+
6+
7+
_init_string = b"ARTIQ sync_struct\n"
8+
9+
10+
class Subscriber:
11+
def __init__(self, target_builder, error_cb, notify_cb=None):
12+
self.target_builder = target_builder
13+
self.error_cb = error_cb
14+
self.notify_cb = notify_cb
15+
16+
@asyncio.coroutine
17+
def connect(self, host, port):
18+
self._reader, self._writer = \
19+
yield from asyncio.open_connection(host, port)
20+
try:
21+
self._writer.write(_init_string)
22+
self._receive_task = asyncio.Task(self._receive_cr())
23+
except:
24+
self._writer.close()
25+
del self._reader
26+
del self._writer
27+
raise
28+
29+
@asyncio.coroutine
30+
def close(self):
31+
try:
32+
self._receive_task.cancel()
33+
try:
34+
yield from asyncio.wait_for(self._receive_task, None)
35+
except asyncio.CancelledError:
36+
pass
37+
finally:
38+
self._writer.close()
39+
del self._reader
40+
del self._writer
41+
42+
@asyncio.coroutine
43+
def _receive_cr(self):
44+
try:
45+
target = None
46+
while True:
47+
line = yield from self._reader.readline()
48+
obj = pyon.decode(line.decode())
49+
action = obj["action"]
50+
51+
if action == "init":
52+
target = self.target_builder(obj["struct"])
53+
elif action == "append":
54+
target.append(obj["x"])
55+
elif action == "pop":
56+
target.pop(obj["i"])
57+
elif action == "delitem":
58+
target.__delitem__(obj["key"])
59+
if self.notify_cb is not None:
60+
self.notify_cb()
61+
except:
62+
self.error_cb()
63+
raise
64+
65+
66+
class Publisher(AsyncioServer):
67+
def __init__(self, backing_struct):
68+
AsyncioServer.__init__(self)
69+
self.backing_struct = backing_struct
70+
self._recipients = set()
71+
72+
@asyncio.coroutine
73+
def _handle_connection_cr(self, reader, writer):
74+
try:
75+
line = yield from reader.readline()
76+
if line != _init_string:
77+
return
78+
79+
obj = {"action": "init", "struct": self.backing_struct}
80+
line = pyon.encode(obj) + "\n"
81+
writer.write(line.encode())
82+
83+
queue = asyncio.Queue()
84+
self._recipients.add(queue)
85+
try:
86+
while True:
87+
line = yield from queue.get()
88+
writer.write(line)
89+
# raise exception on connection error
90+
yield from writer.drain()
91+
finally:
92+
self._recipients.remove(queue)
93+
except ConnectionResetError:
94+
# subscribers disconnecting are a normal occurence
95+
pass
96+
finally:
97+
writer.close()
98+
99+
def _publish(self, obj):
100+
line = pyon.encode(obj) + "\n"
101+
line = line.encode()
102+
for recipient in self._recipients:
103+
recipient.put_nowait(line)
104+
105+
# Backing struct modification methods.
106+
# All modifications must go through them!
107+
108+
def append(self, x):
109+
self.backing_struct.append(x)
110+
self._publish({"action": "append", "x": x})
111+
112+
def pop(self, i=-1):
113+
r = self.backing_struct.pop(i)
114+
self._publish({"action": "pop", "i": i})
115+
return r
116+
117+
def __delitem__(self, key):
118+
self.backing_struct.__delitem__(key)
119+
self._publish({"action": "delitem", "key": key})

‎doc/manual/management_reference.rst

+13
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,21 @@ Management reference
77
.. automodule:: artiq.management.pyon
88
:members:
99

10+
:mod:`artiq.management.network` module
11+
-------------------------------------
12+
13+
.. automodule:: artiq.management.network
14+
:members:
15+
16+
1017
:mod:`artiq.management.pc_rpc` module
1118
-------------------------------------
1219

1320
.. automodule:: artiq.management.pc_rpc
1421
:members:
22+
23+
:mod:`artiq.management.sync_struct` module
24+
-------------------------------------
25+
26+
.. automodule:: artiq.management.sync_struct
27+
:members:

0 commit comments

Comments
 (0)
Please sign in to comment.