Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 83fd160614fb
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 3d9fc7a51f18
Choose a head ref
  • 3 commits
  • 3 files changed
  • 1 contributor

Commits on Jan 27, 2016

  1. Copy the full SHA
    dce2aac View commit details
  2. Copy the full SHA
    79c0488 View commit details
  3. Copy the full SHA
    3d9fc7a View commit details
Showing with 94 additions and 12 deletions.
  1. +2 −6 artiq/protocols/asyncio_server.py
  2. +92 −4 artiq/protocols/pipe_ipc.py
  3. +0 −2 artiq/test/pipe_ipc.py
8 changes: 2 additions & 6 deletions artiq/protocols/asyncio_server.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ class AsyncioServer:
Users of this class must derive from it and define the
``_handle_connection_cr`` method and coroutine.
"""
def __init__(self):
self._client_tasks = set()
@@ -23,15 +22,12 @@ async def start(self, host, port):
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = await asyncio.start_server(self._handle_connection,
host, port)

async def stop(self):
"""Stops the server.
"""
"""Stops the server."""
wait_for = copy(self._client_tasks)
for task in self._client_tasks:
task.cancel()
@@ -48,6 +44,6 @@ def _client_done(self, task):
self._client_tasks.remove(task)

def _handle_connection(self, reader, writer):
task = asyncio.Task(self._handle_connection_cr(reader, writer))
task = asyncio.ensure_future(self._handle_connection_cr(reader, writer))
self._client_tasks.add(task)
task.add_done_callback(self._client_done)
96 changes: 92 additions & 4 deletions artiq/protocols/pipe_ipc.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@
from asyncio.streams import FlowControlMixin


__all__ = ["AsyncioParentComm", "AsyncioChildComm", "ChildComm"]


class _BaseIO:
def write(self, data):
self.writer.write(data)
@@ -92,10 +95,78 @@ def close(self):


else: # windows
class AsyncioParentComm(_BaseIO):
import itertools


_pipe_count = itertools.count()


class AsyncioParentComm:
"""Requires ProactorEventLoop"""
def __init__(self):
# We cannot use anonymous pipes on Windows, because we do not know
# in advance if the child process wants a handle open in overlapped
# mode or not.
self.address = "\\\\.\\pipe\\artiq-{}-{}".format(os.getpid(),
next(_pipe_count))
self.server = None
self.ready = asyncio.Event()
self.write_buffer = b""

def get_address(self):
return self.address

async def _autoclose(self):
await self.process.wait()
self.writer.close()
if self.server is not None:
self.server[0].close()
self.server = None
if self.ready.is_set():
self.writer.close()

async def create_subprocess(self, *args, **kwargs):
loop = asyncio.get_event_loop()

def factory():
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader,
self._child_connected,
loop=loop)
return protocol
self.server = await loop.start_serving_pipe(
factory, self.address)

self.process = await asyncio.create_subprocess_exec(
*args, **kwargs)
asyncio.ensure_future(self._autoclose())

def _child_connected(self, reader, writer):
self.server[0].close()
self.server = None
self.reader = reader
self.writer = writer
if self.write_buffer:
self.writer.write(self.write_buffer)
self.write_buffer = b""
self.ready.set()

def write(self, data):
if self.ready.is_set():
self.writer.write(data)
else:
self.write_buffer += data

async def drain(self):
await self.ready.wait()
await self.writer.drain()

async def readline(self):
await self.ready.wait()
return await self.reader.readline()

async def read(self, n):
await self.ready.wait()
return await self.reader.read(n)


class AsyncioChildComm(_BaseIO):
@@ -109,9 +180,26 @@ async def connect(self):
reader_protocol = asyncio.StreamReaderProtocol(
self.reader, loop=loop)
transport, _ = await loop.create_pipe_connection(
self.address, lambda: reader_protocol)
lambda: reader_protocol, self.address)
self.writer = asyncio.StreamWriter(transport, reader_protocol,
self.reader, loop)

def close(self):
self.writer.close()


class ChildComm:
pass
def __init__(self, address):
self.f = open(address, "a+b", 0)

def read(self, n):
return self.f.read(n)

def readline(self):
return self.f.readline()

def write(self, data):
return self.f.write(data)

def close(self):
self.f.close()
2 changes: 0 additions & 2 deletions artiq/test/pipe_ipc.py
Original file line number Diff line number Diff line change
@@ -6,8 +6,6 @@
from artiq.protocols import pipe_ipc


@unittest.skip("temporarily skip b/c buildbot is failing "
"and i would like packages")
class IPCCase(unittest.TestCase):
def setUp(self):
if os.name == "nt":