Skip to content

Commit a583a92

Browse files
committedJan 26, 2016
worker: use pipe_ipc (no log)
1 parent 6383253 commit a583a92

File tree

2 files changed

+27
-43
lines changed

2 files changed

+27
-43
lines changed
 

Diff for: ‎artiq/master/worker.py

+17-18
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
from functools import partial
99

10-
from artiq.protocols import pyon
10+
from artiq.protocols import pipe_ipc, pyon
1111
from artiq.tools import asyncio_wait_or_cancel
1212

1313

@@ -47,7 +47,7 @@ def __init__(self, handlers=dict(), send_timeout=0.5):
4747

4848
self.rid = None
4949
self.filename = None
50-
self.process = None
50+
self.ipc = None
5151
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
5252

5353
self.io_lock = asyncio.Lock()
@@ -77,10 +77,10 @@ async def _create_process(self, log_level):
7777
try:
7878
if self.closed.is_set():
7979
raise WorkerError("Attempting to create process after close")
80-
self.process = await asyncio.create_subprocess_exec(
80+
self.ipc = pipe_ipc.AsyncioParentComm()
81+
await self.ipc.create_subprocess(
8182
sys.executable, "-m", "artiq.master.worker_impl",
82-
str(log_level),
83-
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
83+
self.ipc.get_address(), str(log_level))
8484
finally:
8585
self.io_lock.release()
8686

@@ -93,15 +93,15 @@ async def close(self, term_timeout=1.0):
9393
self.closed.set()
9494
await self.io_lock.acquire()
9595
try:
96-
if self.process is None:
96+
if self.ipc is None:
9797
# Note the %s - self.rid can be None
9898
logger.debug("worker was not created (RID %s)", self.rid)
9999
return
100-
if self.process.returncode is not None:
100+
if self.ipc.process.returncode is not None:
101101
logger.debug("worker already terminated (RID %s)", self.rid)
102-
if self.process.returncode != 0:
102+
if self.ipc.process.returncode != 0:
103103
logger.warning("worker finished with status code %d"
104-
" (RID %s)", self.process.returncode,
104+
" (RID %s)", self.ipc.process.returncode,
105105
self.rid)
106106
return
107107
obj = {"action": "terminate"}
@@ -111,21 +111,21 @@ async def close(self, term_timeout=1.0):
111111
logger.debug("failed to send terminate command to worker"
112112
" (RID %s), killing", self.rid, exc_info=True)
113113
try:
114-
self.process.kill()
114+
self.ipc.process.kill()
115115
except ProcessLookupError:
116116
pass
117-
await self.process.wait()
117+
await self.ipc.process.wait()
118118
return
119119
try:
120-
await asyncio.wait_for(self.process.wait(), term_timeout)
120+
await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
121121
except asyncio.TimeoutError:
122122
logger.debug("worker did not exit by itself (RID %s), killing",
123123
self.rid)
124124
try:
125-
self.process.kill()
125+
self.ipc.process.kill()
126126
except ProcessLookupError:
127127
pass
128-
await self.process.wait()
128+
await self.ipc.process.wait()
129129
else:
130130
logger.debug("worker exited by itself (RID %s)", self.rid)
131131
finally:
@@ -134,9 +134,8 @@ async def close(self, term_timeout=1.0):
134134
async def _send(self, obj, cancellable=True):
135135
assert self.io_lock.locked()
136136
line = pyon.encode(obj)
137-
self.process.stdin.write(line.encode())
138-
self.process.stdin.write("\n".encode())
139-
ifs = [self.process.stdin.drain()]
137+
self.ipc.write((line + "\n").encode())
138+
ifs = [self.ipc.drain()]
140139
if cancellable:
141140
ifs.append(self.closed.wait())
142141
fs = await asyncio_wait_or_cancel(
@@ -153,7 +152,7 @@ async def _send(self, obj, cancellable=True):
153152
async def _recv(self, timeout):
154153
assert self.io_lock.locked()
155154
fs = await asyncio_wait_or_cancel(
156-
[self.process.stdout.readline(), self.closed.wait()],
155+
[self.ipc.readline(), self.closed.wait()],
157156
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
158157
if all(f.cancelled() for f in fs):
159158
raise WorkerTimeout("Timeout receiving data from worker")

Diff for: ‎artiq/master/worker_impl.py

+10-25
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from collections import OrderedDict
77

88
import artiq
9-
from artiq.protocols import pyon
9+
from artiq.protocols import pipe_ipc, pyon
1010
from artiq.tools import file_import
1111
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
1212
from artiq.language.environment import is_experiment
@@ -15,16 +15,16 @@
1515
from artiq import __version__ as artiq_version
1616

1717

18+
ipc = None
19+
1820
def get_object():
19-
line = sys.__stdin__.readline()
21+
line = ipc.readline().decode()
2022
return pyon.decode(line)
2123

2224

2325
def put_object(obj):
2426
ds = pyon.encode(obj)
25-
sys.__stdout__.write(ds)
26-
sys.__stdout__.write("\n")
27-
sys.__stdout__.flush()
27+
ipc.write((ds + "\n").encode())
2828

2929

3030
class ParentActionError(Exception):
@@ -53,23 +53,6 @@ def parent_action(*args, **kwargs):
5353
return parent_action
5454

5555

56-
class LogForwarder:
57-
def __init__(self):
58-
self.buffer = ""
59-
60-
to_parent = staticmethod(make_parent_action("log"))
61-
62-
def write(self, data):
63-
self.buffer += data
64-
while "\n" in self.buffer:
65-
i = self.buffer.index("\n")
66-
self.to_parent(self.buffer[:i])
67-
self.buffer = self.buffer[i+1:]
68-
69-
def flush(self):
70-
pass
71-
72-
7356
class ParentDeviceDB:
7457
get_device_db = make_parent_action("get_device_db")
7558
get = make_parent_action("get_device", KeyError)
@@ -202,9 +185,10 @@ def render_diagnostic(self, diagnostic):
202185

203186

204187
def main():
205-
sys.stdout = LogForwarder()
206-
sys.stderr = LogForwarder()
207-
logging.basicConfig(level=int(sys.argv[1]))
188+
global ipc
189+
190+
logging.basicConfig(level=int(sys.argv[2]))
191+
ipc = pipe_ipc.ChildComm(sys.argv[1])
208192

209193
start_time = None
210194
rid = None
@@ -277,6 +261,7 @@ def main():
277261
put_object({"action": "exception"})
278262
finally:
279263
device_mgr.close_devices()
264+
ipc.close()
280265

281266

282267
if __name__ == "__main__":

0 commit comments

Comments
 (0)
Please sign in to comment.