Skip to content

Commit 055573a

Browse files
committedFeb 14, 2016
Merge branch 'subprocess-termination'
* subprocess-termination: test_ctlmgr: fix Client: add note about timeout sideeffects hardware_testbench: full shutdown sequence for controllers worker: flake8 style cleanup ctlmgr: fix import pc_rpc.Client: support socket timeouts subprocesses: unify termination logic
2 parents 6ee86d9 + 6434a9c commit 055573a

File tree

5 files changed

+115
-57
lines changed

5 files changed

+115
-57
lines changed
 

Diff for: ‎artiq/devices/ctlmgr.py

+33-22
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import subprocess
44
import shlex
55
import socket
6+
import os
67

78
from artiq.protocols.sync_struct import Subscriber
89
from artiq.protocols.pc_rpc import AsyncioClient
@@ -106,30 +107,40 @@ async def launcher(self):
106107
await self._terminate()
107108

108109
async def _terminate(self):
109-
logger.info("Terminating controller %s", self.name)
110-
if self.process is not None and self.process.returncode is None:
110+
if self.process is None or self.process.returncode is not None:
111+
logger.info("Controller %s already terminated", self.name)
112+
return
113+
logger.debug("Terminating controller %s", self.name)
114+
try:
115+
await asyncio.wait_for(self.call("terminate"), self.term_timeout)
116+
await asyncio.wait_for(self.process.wait(), self.term_timeout)
117+
logger.info("Controller %s terminated", self.name)
118+
return
119+
except:
120+
logger.warning("Controller %s did not exit on request, "
121+
"ending the process", self.name)
122+
if os.name != "nt":
111123
try:
112-
await asyncio.wait_for(self.call("terminate"),
113-
self.term_timeout)
114-
except:
115-
logger.warning("Controller %s did not respond to terminate "
116-
"command, killing", self.name)
117-
try:
118-
self.process.kill()
119-
except ProcessLookupError:
120-
pass
124+
self.process.terminate()
125+
except ProcessLookupError:
126+
pass
121127
try:
122-
await asyncio.wait_for(self.process.wait(),
123-
self.term_timeout)
124-
except:
125-
logger.warning("Controller %s failed to exit, killing",
126-
self.name)
127-
try:
128-
self.process.kill()
129-
except ProcessLookupError:
130-
pass
131-
await self.process.wait()
132-
logger.debug("Controller %s terminated", self.name)
128+
await asyncio.wait_for(self.process.wait(), self.term_timeout)
129+
logger.info("Controller process %s terminated", self.name)
130+
return
131+
except asyncio.TimeoutError:
132+
logger.warning("Controller process %s did not terminate, "
133+
"killing", self.name)
134+
try:
135+
self.process.kill()
136+
except ProcessLookupError:
137+
pass
138+
try:
139+
await asyncio.wait_for(self.process.wait(), self.term_timeout)
140+
logger.info("Controller process %s killed", self.name)
141+
return
142+
except asyncio.TimeoutError:
143+
logger.warning("Controller process %s failed to die", self.name)
133144

134145

135146
def get_ip_addresses(host):

Diff for: ‎artiq/master/worker.py

+29-19
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import subprocess
66
import traceback
77
import time
8-
from functools import partial
98

109
from artiq.protocols import pipe_ipc, pyon
1110
from artiq.protocols.logging import LogParser
@@ -115,30 +114,38 @@ async def close(self, term_timeout=1.0):
115114
" (RID %s)", self.ipc.process.returncode,
116115
self.rid)
117116
return
118-
obj = {"action": "terminate"}
119117
try:
120-
await self._send(obj, cancellable=False)
118+
await self._send({"action": "terminate"}, cancellable=False)
119+
await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
120+
logger.debug("worker exited on request (RID %s)", self.rid)
121+
return
121122
except:
122-
logger.debug("failed to send terminate command to worker"
123-
" (RID %s), killing", self.rid, exc_info=True)
123+
logger.debug("worker failed to exit on request"
124+
" (RID %s), ending the process", self.rid,
125+
exc_info=True)
126+
if os.name != "nt":
124127
try:
125-
self.ipc.process.kill()
128+
self.ipc.process.terminate()
126129
except ProcessLookupError:
127130
pass
128-
await self.ipc.process.wait()
129-
return
131+
try:
132+
await asyncio.wait_for(self.ipc.process.wait(),
133+
term_timeout)
134+
logger.debug("worker terminated (RID %s)", self.rid)
135+
return
136+
except asyncio.TimeoutError:
137+
logger.warning(
138+
"worker did not terminate (RID %s), killing", self.rid)
139+
try:
140+
self.ipc.process.kill()
141+
except ProcessLookupError:
142+
pass
130143
try:
131144
await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
145+
logger.debug("worker killed (RID %s)", self.rid)
146+
return
132147
except asyncio.TimeoutError:
133-
logger.debug("worker did not exit by itself (RID %s), killing",
134-
self.rid)
135-
try:
136-
self.ipc.process.kill()
137-
except ProcessLookupError:
138-
pass
139-
await self.ipc.process.wait()
140-
else:
141-
logger.debug("worker exited by itself (RID %s)", self.rid)
148+
logger.warning("worker refuses to die (RID %s)", self.rid)
142149
finally:
143150
self.io_lock.release()
144151

@@ -208,7 +215,8 @@ async def _handle_worker_requests(self):
208215
reply = {"status": "ok", "data": data}
209216
except Exception as e:
210217
reply = {"status": "failed",
211-
"exception": traceback.format_exception_only(type(e), e)[0][:-1],
218+
"exception": traceback.format_exception_only(
219+
type(e), e)[0][:-1],
212220
"message": str(e),
213221
"traceback": traceback.format_tb(e.__traceback__)}
214222
await self.io_lock.acquire()
@@ -235,7 +243,8 @@ async def _worker_action(self, obj, timeout=None):
235243
del self.watchdogs[-1]
236244
return completed
237245

238-
async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0):
246+
async def build(self, rid, pipeline_name, wd, expid, priority,
247+
timeout=15.0):
239248
self.rid = rid
240249
self.filename = os.path.basename(expid["file"])
241250
await self._create_process(expid["log_level"])
@@ -280,6 +289,7 @@ async def examine(self, rid, file, timeout=20.0):
280289

281290
await self._create_process(logging.WARNING)
282291
r = dict()
292+
283293
def register(class_name, name, arginfo):
284294
r[class_name] = {"name": name, "arginfo": arginfo}
285295
self.register_experiment = register

Diff for: ‎artiq/protocols/pc_rpc.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def _validate_target_name(target_name, target_names):
5757
target_name = target_names[0]
5858
elif target_name not in target_names:
5959
raise IncompatibleServer(
60-
"valid target name(s): " +
61-
" ".join(sorted(target_names)))
60+
"valid target name(s): " + " ".join(sorted(target_names)))
6261
return target_name
6362

6463

@@ -92,9 +91,16 @@ class Client:
9291
Use ``None`` to skip selecting a target. The list of targets can then
9392
be retrieved using ``get_rpc_id`` and then one can be selected later
9493
using ``select_rpc_target``.
94+
:param timeout: Socket operation timeout. Use ``None`` for blocking
95+
(default), ``0`` for non-blocking, and a finite value to raise
96+
``socket.timeout`` if an operation does not complete within the
97+
given time. See also ``socket.create_connection()`` and
98+
``socket.settimeout()`` in the Python standard library. A timeout
99+
in the middle of a RPC can break subsequent RPCs (from the same
100+
client).
95101
"""
96-
def __init__(self, host, port, target_name=AutoTarget):
97-
self.__socket = socket.create_connection((host, port))
102+
def __init__(self, host, port, target_name=AutoTarget, timeout=None):
103+
self.__socket = socket.create_connection((host, port), timeout)
98104

99105
try:
100106
self.__socket.sendall(_init_string)
@@ -485,7 +491,8 @@ async def _handle_connection_cr(self, reader, writer):
485491
obj = {"status": "ok", "ret": doc}
486492
elif obj["action"] == "call":
487493
logger.debug("calling %s", _PrettyPrintCall(obj))
488-
if self.builtin_terminate and obj["name"] == "terminate":
494+
if (self.builtin_terminate and obj["name"] ==
495+
"terminate"):
489496
self._terminate_request.set()
490497
obj = {"status": "ok", "ret": None}
491498
else:

Diff for: ‎artiq/test/hardware_testbench.py

+40-10
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import subprocess
99
import shlex
1010
import time
11+
import socket
1112

1213
from artiq.master.databases import DeviceDB, DatasetDB
1314
from artiq.master.worker_db import DeviceManager, DatasetManager
1415
from artiq.coredevice.core import CompileError
1516
from artiq.frontend.artiq_run import DummyScheduler
17+
from artiq.protocols.pc_rpc import AutoTarget, Client
1618

1719

1820
artiq_root = os.getenv("ARTIQ_ROOT")
@@ -27,13 +29,13 @@ def setUp(self):
2729
self.controllers = {}
2830

2931
def tearDown(self):
30-
for name in self.controllers:
31-
self.device_mgr.get(name).terminate()
3232
self.device_mgr.close_devices()
3333
for name in list(self.controllers):
3434
self.stop_controller(name)
3535

3636
def start_controller(self, name, sleep=1):
37+
if name in self.controllers:
38+
raise ValueError("controller `{}` already started".format(name))
3739
try:
3840
entry = self.device_db.get(name)
3941
except KeyError:
@@ -46,15 +48,43 @@ def start_controller(self, name, sleep=1):
4648
time.sleep(sleep)
4749

4850
def stop_controller(self, name, default_timeout=1):
49-
entry, proc = self.controllers[name]
50-
t = entry.get("term_timeout", default_timeout)
51-
proc.terminate()
51+
desc, proc = self.controllers[name]
52+
t = desc.get("term_timeout", default_timeout)
53+
target_name = desc.get("target_name", None)
54+
if target_name is None:
55+
target_name = AutoTarget
5256
try:
53-
proc.wait(t)
54-
except subprocess.TimeoutExpired:
55-
proc.kill()
56-
proc.wait(t)
57-
del self.controllers[name]
57+
try:
58+
client = Client(desc["host"], desc["port"], target_name, t)
59+
try:
60+
client.terminate()
61+
finally:
62+
client.close_rpc()
63+
proc.wait(t)
64+
return
65+
except (socket.timeout, subprocess.TimeoutExpired):
66+
logger.warning("Controller %s failed to exit on request", name)
67+
try:
68+
proc.terminate()
69+
except ProcessLookupError:
70+
pass
71+
try:
72+
proc.wait(t)
73+
return
74+
except subprocess.TimeoutExpired:
75+
logger.warning("Controller %s failed to exit on terminate",
76+
name)
77+
try:
78+
proc.kill()
79+
except ProcessLookupError:
80+
pass
81+
try:
82+
proc.wait(t)
83+
return
84+
except subprocess.TimeoutExpired:
85+
logger.warning("Controller %s failed to die on kill", name)
86+
finally:
87+
del self.controllers[name]
5888

5989

6090
@unittest.skipUnless(artiq_root, "no ARTIQ_ROOT")

Diff for: ‎artiq/test/test_ctlmgr.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ async def wait_for_ping(self, host, port, retries=5, timeout=2):
5656
raise asyncio.TimeoutError
5757

5858
def test_start_ping_stop_controller(self):
59-
command = sys.executable + " -m "
6059
entry = {
6160
"type": "controller",
6261
"host": "::1",
@@ -67,5 +66,6 @@ def test_start_ping_stop_controller(self):
6766
async def test():
6867
await self.start("lda_sim", entry)
6968
remote = await self.get_client(entry["host"], entry["port"])
69+
await remote.ping()
7070

7171
self.loop.run_until_complete(test())

0 commit comments

Comments
 (0)
Please sign in to comment.