7
7
import time
8
8
from functools import partial
9
9
10
- from artiq .protocols import pyon
10
+ from artiq .protocols import pipe_ipc , pyon
11
11
from artiq .tools import asyncio_wait_or_cancel
12
12
13
13
@@ -47,7 +47,7 @@ def __init__(self, handlers=dict(), send_timeout=0.5):
47
47
48
48
self .rid = None
49
49
self .filename = None
50
- self .process = None
50
+ self .ipc = None
51
51
self .watchdogs = dict () # wid -> expiration (using time.monotonic)
52
52
53
53
self .io_lock = asyncio .Lock ()
@@ -77,10 +77,10 @@ async def _create_process(self, log_level):
77
77
try :
78
78
if self .closed .is_set ():
79
79
raise WorkerError ("Attempting to create process after close" )
80
- self .process = await asyncio .create_subprocess_exec (
80
+ self .ipc = pipe_ipc .AsyncioParentComm ()
81
+ await self .ipc .create_subprocess (
81
82
sys .executable , "-m" , "artiq.master.worker_impl" ,
82
- str (log_level ),
83
- stdout = subprocess .PIPE , stdin = subprocess .PIPE )
83
+ self .ipc .get_address (), str (log_level ))
84
84
finally :
85
85
self .io_lock .release ()
86
86
@@ -93,15 +93,15 @@ async def close(self, term_timeout=1.0):
93
93
self .closed .set ()
94
94
await self .io_lock .acquire ()
95
95
try :
96
- if self .process is None :
96
+ if self .ipc is None :
97
97
# Note the %s - self.rid can be None
98
98
logger .debug ("worker was not created (RID %s)" , self .rid )
99
99
return
100
- if self .process .returncode is not None :
100
+ if self .ipc . process .returncode is not None :
101
101
logger .debug ("worker already terminated (RID %s)" , self .rid )
102
- if self .process .returncode != 0 :
102
+ if self .ipc . process .returncode != 0 :
103
103
logger .warning ("worker finished with status code %d"
104
- " (RID %s)" , self .process .returncode ,
104
+ " (RID %s)" , self .ipc . process .returncode ,
105
105
self .rid )
106
106
return
107
107
obj = {"action" : "terminate" }
@@ -111,21 +111,21 @@ async def close(self, term_timeout=1.0):
111
111
logger .debug ("failed to send terminate command to worker"
112
112
" (RID %s), killing" , self .rid , exc_info = True )
113
113
try :
114
- self .process .kill ()
114
+ self .ipc . process .kill ()
115
115
except ProcessLookupError :
116
116
pass
117
- await self .process .wait ()
117
+ await self .ipc . process .wait ()
118
118
return
119
119
try :
120
- await asyncio .wait_for (self .process .wait (), term_timeout )
120
+ await asyncio .wait_for (self .ipc . process .wait (), term_timeout )
121
121
except asyncio .TimeoutError :
122
122
logger .debug ("worker did not exit by itself (RID %s), killing" ,
123
123
self .rid )
124
124
try :
125
- self .process .kill ()
125
+ self .ipc . process .kill ()
126
126
except ProcessLookupError :
127
127
pass
128
- await self .process .wait ()
128
+ await self .ipc . process .wait ()
129
129
else :
130
130
logger .debug ("worker exited by itself (RID %s)" , self .rid )
131
131
finally :
@@ -134,9 +134,8 @@ async def close(self, term_timeout=1.0):
134
134
async def _send (self , obj , cancellable = True ):
135
135
assert self .io_lock .locked ()
136
136
line = pyon .encode (obj )
137
- self .process .stdin .write (line .encode ())
138
- self .process .stdin .write ("\n " .encode ())
139
- ifs = [self .process .stdin .drain ()]
137
+ self .ipc .write ((line + "\n " ).encode ())
138
+ ifs = [self .ipc .drain ()]
140
139
if cancellable :
141
140
ifs .append (self .closed .wait ())
142
141
fs = await asyncio_wait_or_cancel (
@@ -153,7 +152,7 @@ async def _send(self, obj, cancellable=True):
153
152
async def _recv (self , timeout ):
154
153
assert self .io_lock .locked ()
155
154
fs = await asyncio_wait_or_cancel (
156
- [self .process . stdout .readline (), self .closed .wait ()],
155
+ [self .ipc .readline (), self .closed .wait ()],
157
156
timeout = timeout , return_when = asyncio .FIRST_COMPLETED )
158
157
if all (f .cancelled () for f in fs ):
159
158
raise WorkerTimeout ("Timeout receiving data from worker" )
0 commit comments