Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a46d3ff3663e
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 9dd7ea0bcd9a
Choose a head ref
  • 2 commits
  • 3 files changed
  • 1 contributor

Commits on May 22, 2016

  1. Copy the full SHA
    114b305 View commit details

Commits on May 23, 2016

  1. Copy the full SHA
    9dd7ea0 View commit details
Showing with 104 additions and 3 deletions.
  1. +2 −2 artiq/devices/ctlmgr.py
  2. +101 −0 artiq/protocols/broadcast.py
  3. +1 −1 artiq/protocols/sync_struct.py
4 changes: 2 additions & 2 deletions artiq/devices/ctlmgr.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ def __init__(self, name, ddb_entry):
self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
self.process = None
self.launch_task = asyncio.Task(self.launcher())
self.launch_task = asyncio.ensure_future(self.launcher())

async def end(self):
self.launch_task.cancel()
@@ -160,7 +160,7 @@ def __init__(self):
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict()
self.process_task = asyncio.Task(self._process())
self.process_task = asyncio.ensure_future(self._process())

async def _process(self):
while True:
101 changes: 101 additions & 0 deletions artiq/protocols/broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import asyncio

from artiq.protocols import pyon
from artiq.protocols.asyncio_server import AsyncioServer


_init_string = b"ARTIQ broadcast\n"


class Receiver:
def __init__(self, name, notify_cb):
self.name = name
if not isinstance(notify_cb, list):
notify_cb = [notify_cb]
self.notify_cbs = notify_cb

async def connect(self, host, port):
self.reader, self.writer = \
await asyncio.open_connection(host, port, limit=4*1024*1024)
try:
self.writer.write(_init_string)
self.writer.write((self.name + "\n").encode())
self.receive_task = asyncio.ensure_future(self._receive_cr())
except:
self.writer.close()
del self.reader
del self.writer
raise

async def close(self):
try:
self.receive_task.cancel()
try:
await asyncio.wait_for(self.receive_task, None)
except asyncio.CancelledError:
pass
finally:
self.writer.close()
del self.reader
del self.writer

async def _receive_cr(self):
target = None
while True:
line = await self.reader.readline()
if not line:
return
obj = pyon.decode(line.decode())

for notify_cb in self.notify_cbs:
notify_cb(obj)


class Broadcaster(AsyncioServer):
def __init__(self):
AsyncioServer.__init__(self, maxbuf=1024)
self._maxbuf = maxbuf
self._recipients = dict()

async def _handle_connection_cr(self, reader, writer):
try:
line = await reader.readline()
if line != _init_string:
return

line = await reader.readline()
if not line:
return
name = line.decode()[:-1]

queue = asyncio.Queue(self._maxbuf)
if name in self._recipients:
self._recipients[name].add(queue)
else:
self._recipients[name] = {queue}
try:
while True:
line = await queue.get()
writer.write(line)
# raise exception on connection error
await writer.drain()
finally:
self._recipients[name].remove(queue)
if not self._recipients[name]:
del self._recipients[name]
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
# receivers disconnecting are a normal occurence
pass
finally:
writer.close()

def broadcast(self, name, obj):
if name in self._recipients:
line = pyon.encode(obj) + "\n"
line = line.encode()
for recipient in self._recipients[name]:
try:
recipient.put_nowait(line)
except asyncio.QueueFull:
# do not log as logs may be redirected to a Broadcaster
pass
2 changes: 1 addition & 1 deletion artiq/protocols/sync_struct.py
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ async def connect(self, host, port, before_receive_cb=None):
before_receive_cb()
self.writer.write(_init_string)
self.writer.write((self.notifier_name + "\n").encode())
self.receive_task = asyncio.Task(self._receive_cr())
self.receive_task = asyncio.ensure_future(self._receive_cr())
except:
self.writer.close()
del self.reader