Skip to content

Commit 6383253

Browse files
committedJan 26, 2016
protocols/pipe_ipc: autoclose pipe fds on process exit in AsyncioParentComm
1 parent 7a9864b commit 6383253

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed
 

Diff for: ‎artiq/protocols/pipe_ipc.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,16 @@ async def readline(self):
1616
async def read(self, n):
1717
return await self.reader.read(n)
1818

19-
def close(self):
20-
self.writer.close()
21-
2219

2320
if os.name != "nt":
2421
async def _fds_to_asyncio(rfd, wfd, loop):
2522
reader = asyncio.StreamReader(loop=loop)
2623
reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
2724

2825
wf = open(wfd, "wb", 0)
29-
transport, protocol = await loop.connect_write_pipe(
26+
transport, _ = await loop.connect_write_pipe(
3027
FlowControlMixin, wf)
31-
writer = asyncio.StreamWriter(transport, protocol,
28+
writer = asyncio.StreamWriter(transport, reader_protocol,
3229
None, loop)
3330

3431
rf = open(rfd, "rb", 0)
@@ -45,6 +42,10 @@ def __init__(self):
4542
def get_address(self):
4643
return "{},{}".format(self.c_rfd, self.c_wfd)
4744

45+
async def _autoclose(self):
46+
await self.process.wait()
47+
self.writer.close()
48+
4849
async def create_subprocess(self, *args, **kwargs):
4950
loop = asyncio.get_event_loop()
5051
self.process = await asyncio.create_subprocess_exec(
@@ -54,6 +55,7 @@ async def create_subprocess(self, *args, **kwargs):
5455

5556
self.reader, self.writer = await _fds_to_asyncio(
5657
self.p_rfd, self.p_wfd, loop)
58+
asyncio.ensure_future(self._autoclose())
5759

5860

5961
class AsyncioChildComm(_BaseIO):
@@ -65,6 +67,9 @@ async def connect(self):
6567
self.reader, self.writer = await _fds_to_asyncio(
6668
int(rfd), int(wfd), asyncio.get_event_loop())
6769

70+
def close(self):
71+
self.writer.close()
72+
6873

6974
class ChildComm:
7075
def __init__(self, address):
@@ -88,7 +93,10 @@ def close(self):
8893

8994
else: # windows
9095
class AsyncioParentComm(_BaseIO):
91-
pass
96+
async def _autoclose(self):
97+
await self.process.wait()
98+
self.writer.close()
99+
92100

93101
class AsyncioChildComm(_BaseIO):
94102
"""Requires ProactorEventLoop"""
@@ -100,9 +108,9 @@ async def connect(self):
100108
self.reader = asyncio.StreamReader(loop=loop)
101109
reader_protocol = asyncio.StreamReaderProtocol(
102110
self.reader, loop=loop)
103-
transport, protocol = await loop.create_pipe_connection(
111+
transport, _ = await loop.create_pipe_connection(
104112
self.address, lambda: reader_protocol)
105-
self.writer = asyncio.StreamWriter(transport, protocol,
113+
self.writer = asyncio.StreamWriter(transport, reader_protocol,
106114
self.reader, loop)
107115

108116
class ChildComm:

Diff for: ‎artiq/test/pipe_ipc.py

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ async def _coro_test(self, child_asyncio):
3030
self.assertEqual(int(s), i+1)
3131
ipc.write(b"-1\n")
3232
await ipc.process.wait()
33-
ipc.close()
3433

3534
def test_blocking(self):
3635
self.loop.run_until_complete(self._coro_test(False))

0 commit comments

Comments
 (0)