@@ -16,19 +16,16 @@ async def readline(self):
16
16
async def read (self , n ):
17
17
return await self .reader .read (n )
18
18
19
- def close (self ):
20
- self .writer .close ()
21
-
22
19
23
20
if os .name != "nt" :
24
21
async def _fds_to_asyncio (rfd , wfd , loop ):
25
22
reader = asyncio .StreamReader (loop = loop )
26
23
reader_protocol = asyncio .StreamReaderProtocol (reader , loop = loop )
27
24
28
25
wf = open (wfd , "wb" , 0 )
29
- transport , protocol = await loop .connect_write_pipe (
26
+ transport , _ = await loop .connect_write_pipe (
30
27
FlowControlMixin , wf )
31
- writer = asyncio .StreamWriter (transport , protocol ,
28
+ writer = asyncio .StreamWriter (transport , reader_protocol ,
32
29
None , loop )
33
30
34
31
rf = open (rfd , "rb" , 0 )
@@ -45,6 +42,10 @@ def __init__(self):
45
42
def get_address (self ):
46
43
return "{},{}" .format (self .c_rfd , self .c_wfd )
47
44
45
+ async def _autoclose (self ):
46
+ await self .process .wait ()
47
+ self .writer .close ()
48
+
48
49
async def create_subprocess (self , * args , ** kwargs ):
49
50
loop = asyncio .get_event_loop ()
50
51
self .process = await asyncio .create_subprocess_exec (
@@ -54,6 +55,7 @@ async def create_subprocess(self, *args, **kwargs):
54
55
55
56
self .reader , self .writer = await _fds_to_asyncio (
56
57
self .p_rfd , self .p_wfd , loop )
58
+ asyncio .ensure_future (self ._autoclose ())
57
59
58
60
59
61
class AsyncioChildComm (_BaseIO ):
@@ -65,6 +67,9 @@ async def connect(self):
65
67
self .reader , self .writer = await _fds_to_asyncio (
66
68
int (rfd ), int (wfd ), asyncio .get_event_loop ())
67
69
70
+ def close (self ):
71
+ self .writer .close ()
72
+
68
73
69
74
class ChildComm :
70
75
def __init__ (self , address ):
@@ -88,7 +93,10 @@ def close(self):
88
93
89
94
else : # windows
90
95
class AsyncioParentComm (_BaseIO ):
91
- pass
96
+ async def _autoclose (self ):
97
+ await self .process .wait ()
98
+ self .writer .close ()
99
+
92
100
93
101
class AsyncioChildComm (_BaseIO ):
94
102
"""Requires ProactorEventLoop"""
@@ -100,9 +108,9 @@ async def connect(self):
100
108
self .reader = asyncio .StreamReader (loop = loop )
101
109
reader_protocol = asyncio .StreamReaderProtocol (
102
110
self .reader , loop = loop )
103
- transport , protocol = await loop .create_pipe_connection (
111
+ transport , _ = await loop .create_pipe_connection (
104
112
self .address , lambda : reader_protocol )
105
- self .writer = asyncio .StreamWriter (transport , protocol ,
113
+ self .writer = asyncio .StreamWriter (transport , reader_protocol ,
106
114
self .reader , loop )
107
115
108
116
class ChildComm :
0 commit comments