-
Notifications
You must be signed in to change notification settings - Fork 201
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f7232fd
commit f033810
Showing
4 changed files
with
191 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import asyncio | ||
from copy import copy | ||
|
||
|
||
class AsyncioServer: | ||
"""Generic TCP server based on asyncio. | ||
Users of this class must derive from it and define the | ||
``_handle_connection_cr`` method and coroutine. | ||
""" | ||
def __init__(self): | ||
self._client_tasks = set() | ||
|
||
@asyncio.coroutine | ||
def start(self, host, port): | ||
"""Starts the server. | ||
The user must call ``stop`` to free resources properly after this | ||
method completes successfully. | ||
This method is a `coroutine`. | ||
:param host: Bind address of the server (see ``asyncio.start_server`` | ||
from the Python standard library). | ||
:param port: TCP port to bind to. | ||
""" | ||
self.server = yield from asyncio.start_server(self._handle_connection, | ||
host, port) | ||
|
||
@asyncio.coroutine | ||
def stop(self): | ||
"""Stops the server. | ||
""" | ||
wait_for = copy(self._client_tasks) | ||
for task in self._client_tasks: | ||
task.cancel() | ||
for task in wait_for: | ||
try: | ||
yield from asyncio.wait_for(task, None) | ||
except asyncio.CancelledError: | ||
pass | ||
self.server.close() | ||
yield from self.server.wait_closed() | ||
del self.server | ||
|
||
def _client_done(self, task): | ||
self._client_tasks.remove(task) | ||
|
||
def _handle_connection(self, reader, writer): | ||
task = asyncio.Task(self._handle_connection_cr(reader, writer)) | ||
self._client_tasks.add(task) | ||
task.add_done_callback(self._client_done) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import asyncio | ||
|
||
from artiq.management import pyon | ||
from artiq.management.network import AsyncioServer | ||
|
||
|
||
_init_string = b"ARTIQ sync_struct\n" | ||
|
||
|
||
class Subscriber: | ||
def __init__(self, target_builder, error_cb, notify_cb=None): | ||
self.target_builder = target_builder | ||
self.error_cb = error_cb | ||
self.notify_cb = notify_cb | ||
|
||
@asyncio.coroutine | ||
def connect(self, host, port): | ||
self._reader, self._writer = \ | ||
yield from asyncio.open_connection(host, port) | ||
try: | ||
self._writer.write(_init_string) | ||
self._receive_task = asyncio.Task(self._receive_cr()) | ||
except: | ||
self._writer.close() | ||
del self._reader | ||
del self._writer | ||
raise | ||
|
||
@asyncio.coroutine | ||
def close(self): | ||
try: | ||
self._receive_task.cancel() | ||
try: | ||
yield from asyncio.wait_for(self._receive_task, None) | ||
except asyncio.CancelledError: | ||
pass | ||
finally: | ||
self._writer.close() | ||
del self._reader | ||
del self._writer | ||
|
||
@asyncio.coroutine | ||
def _receive_cr(self): | ||
try: | ||
target = None | ||
while True: | ||
line = yield from self._reader.readline() | ||
obj = pyon.decode(line.decode()) | ||
action = obj["action"] | ||
|
||
if action == "init": | ||
target = self.target_builder(obj["struct"]) | ||
elif action == "append": | ||
target.append(obj["x"]) | ||
elif action == "pop": | ||
target.pop(obj["i"]) | ||
elif action == "delitem": | ||
target.__delitem__(obj["key"]) | ||
if self.notify_cb is not None: | ||
self.notify_cb() | ||
except: | ||
self.error_cb() | ||
raise | ||
|
||
|
||
class Publisher(AsyncioServer): | ||
def __init__(self, backing_struct): | ||
AsyncioServer.__init__(self) | ||
self.backing_struct = backing_struct | ||
self._recipients = set() | ||
|
||
@asyncio.coroutine | ||
def _handle_connection_cr(self, reader, writer): | ||
try: | ||
line = yield from reader.readline() | ||
if line != _init_string: | ||
return | ||
|
||
obj = {"action": "init", "struct": self.backing_struct} | ||
line = pyon.encode(obj) + "\n" | ||
writer.write(line.encode()) | ||
|
||
queue = asyncio.Queue() | ||
self._recipients.add(queue) | ||
try: | ||
while True: | ||
line = yield from queue.get() | ||
writer.write(line) | ||
# raise exception on connection error | ||
yield from writer.drain() | ||
finally: | ||
self._recipients.remove(queue) | ||
except ConnectionResetError: | ||
# subscribers disconnecting are a normal occurence | ||
pass | ||
finally: | ||
writer.close() | ||
|
||
def _publish(self, obj): | ||
line = pyon.encode(obj) + "\n" | ||
line = line.encode() | ||
for recipient in self._recipients: | ||
recipient.put_nowait(line) | ||
|
||
# Backing struct modification methods. | ||
# All modifications must go through them! | ||
|
||
def append(self, x): | ||
self.backing_struct.append(x) | ||
self._publish({"action": "append", "x": x}) | ||
|
||
def pop(self, i=-1): | ||
r = self.backing_struct.pop(i) | ||
self._publish({"action": "pop", "i": i}) | ||
return r | ||
|
||
def __delitem__(self, key): | ||
self.backing_struct.__delitem__(key) | ||
self._publish({"action": "delitem", "key": key}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters