Skip to content

Commit 5aa4de8

Browse files
committedJan 26, 2016
refactor logging and implement in worker
1 parent a583a92 commit 5aa4de8

File tree

7 files changed

+64
-61
lines changed

7 files changed

+64
-61
lines changed
 

Diff for: ‎artiq/frontend/artiq_ctlmgr.py

+9-22
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212

1313
from artiq.protocols.sync_struct import Subscriber
1414
from artiq.protocols.pc_rpc import AsyncioClient, Server
15-
from artiq.protocols.logging import (LogForwarder,
16-
parse_log_message, log_with_name,
15+
from artiq.protocols.logging import (LogForwarder, LogParser,
1716
SourceFilter)
1817
from artiq.tools import *
1918

@@ -79,22 +78,8 @@ async def _wait_and_ping(self):
7978
else:
8079
break
8180

82-
async def forward_logs(self, stream):
83-
source = "controller({})".format(self.name)
84-
while True:
85-
try:
86-
entry = (await stream.readline())
87-
if not entry:
88-
break
89-
entry = entry[:-1]
90-
level, name, message = parse_log_message(entry.decode())
91-
log_with_name(name, level, message, extra={"source": source})
92-
except:
93-
logger.debug("exception in log forwarding", exc_info=True)
94-
break
95-
logger.debug("stopped log forwarding of stream %s of %s",
96-
stream, self.name)
97-
81+
def _get_log_source(self):
82+
return "controller({})".format(self.name)
9883

9984
async def launcher(self):
10085
try:
@@ -105,10 +90,12 @@ async def launcher(self):
10590
self.process = await asyncio.create_subprocess_exec(
10691
*shlex.split(self.command),
10792
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
108-
asyncio.ensure_future(self.forward_logs(
109-
self.process.stdout))
110-
asyncio.ensure_future(self.forward_logs(
111-
self.process.stderr))
93+
asyncio.ensure_future(
94+
LogParser(self._get_log_source).stream_task(
95+
self.process.stdout))
96+
asyncio.ensure_future(
97+
LogParser(self._get_log_source).stream_task(
98+
self.process.stderr))
11299
await self._wait_and_ping()
113100
except FileNotFoundError:
114101
logger.warning("Controller %s failed to start", self.name)

Diff for: ‎artiq/frontend/artiq_master.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from artiq.protocols.pc_rpc import Server as RPCServer
1010
from artiq.protocols.sync_struct import Publisher
1111
from artiq.protocols.logging import Server as LoggingServer
12-
from artiq.master.log import log_args, init_log, log_worker
12+
from artiq.master.log import log_args, init_log
1313
from artiq.master.databases import DeviceDB, DatasetDB
1414
from artiq.master.scheduler import Scheduler
1515
from artiq.master.worker_db import get_last_rid
@@ -63,17 +63,15 @@ def main():
6363
repo_backend = GitBackend(args.repository)
6464
else:
6565
repo_backend = FilesystemBackend(args.repository)
66-
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db,
67-
log_worker)
66+
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db)
6867
atexit.register(experiment_db.close)
6968
experiment_db.scan_repository_async()
7069

7170
worker_handlers = {
7271
"get_device_db": device_db.get_device_db,
7372
"get_device": device_db.get,
7473
"get_dataset": dataset_db.get,
75-
"update_dataset": dataset_db.update,
76-
"log": log_worker
74+
"update_dataset": dataset_db.update
7775
}
7876
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db)
7977
worker_handlers.update({

Diff for: ‎artiq/master/experiments.py

+10-19
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import tempfile
44
import shutil
55
import logging
6-
from functools import partial
76

87
from artiq.protocols.sync_struct import Notifier
98
from artiq.master.worker import (Worker, WorkerInternalException,
@@ -15,13 +14,10 @@
1514

1615

1716
async def _get_repository_entries(entry_dict,
18-
root, filename, get_device_db, log):
19-
worker = Worker({
20-
"get_device_db": get_device_db,
21-
"log": partial(log, "scan", os.path.basename(filename))
22-
})
17+
root, filename, get_device_db):
18+
worker = Worker({"get_device_db": get_device_db})
2319
try:
24-
description = await worker.examine(os.path.join(root, filename))
20+
description = await worker.examine("scan", os.path.join(root, filename))
2521
except:
2622
log_worker_exception()
2723
raise
@@ -49,7 +45,7 @@ async def _get_repository_entries(entry_dict,
4945
entry_dict[name] = entry
5046

5147

52-
async def _scan_experiments(root, get_device_db, log, subdir=""):
48+
async def _scan_experiments(root, get_device_db, subdir=""):
5349
entry_dict = dict()
5450
for de in os.scandir(os.path.join(root, subdir)):
5551
if de.name.startswith("."):
@@ -58,13 +54,13 @@ async def _scan_experiments(root, get_device_db, log, subdir=""):
5854
filename = os.path.join(subdir, de.name)
5955
try:
6056
await _get_repository_entries(
61-
entry_dict, root, filename, get_device_db, log)
57+
entry_dict, root, filename, get_device_db)
6258
except Exception as exc:
6359
logger.warning("Skipping file '%s'", filename,
6460
exc_info=not isinstance(exc, WorkerInternalException))
6561
if de.is_dir():
6662
subentries = await _scan_experiments(
67-
root, get_device_db, log,
63+
root, get_device_db,
6864
os.path.join(subdir, de.name))
6965
entries = {de.name + "/" + k: v for k, v in subentries.items()}
7066
entry_dict.update(entries)
@@ -81,10 +77,9 @@ def _sync_explist(target, source):
8177

8278

8379
class ExperimentDB:
84-
def __init__(self, repo_backend, get_device_db_fn, log_fn):
80+
def __init__(self, repo_backend, get_device_db_fn):
8581
self.repo_backend = repo_backend
8682
self.get_device_db_fn = get_device_db_fn
87-
self.log_fn = log_fn
8883

8984
self.cur_rev = self.repo_backend.get_head_rev()
9085
self.repo_backend.request_rev(self.cur_rev)
@@ -106,8 +101,7 @@ async def scan_repository(self, new_cur_rev=None):
106101
wd, _ = self.repo_backend.request_rev(new_cur_rev)
107102
self.repo_backend.release_rev(self.cur_rev)
108103
self.cur_rev = new_cur_rev
109-
new_explist = await _scan_experiments(wd, self.get_device_db_fn,
110-
self.log_fn)
104+
new_explist = await _scan_experiments(wd, self.get_device_db_fn)
111105

112106
_sync_explist(self.explist, new_explist)
113107
finally:
@@ -122,12 +116,9 @@ async def examine(self, filename, use_repository=True):
122116
revision = self.cur_rev
123117
wd, _ = self.repo_backend.request_rev(revision)
124118
filename = os.path.join(wd, filename)
125-
worker = Worker({
126-
"get_device_db": self.get_device_db_fn,
127-
"log": partial(self.log_fn, "examine", os.path.basename(filename))
128-
})
119+
worker = Worker({"get_device_db": self.get_device_db_fn})
129120
try:
130-
description = await worker.examine(filename)
121+
description = await worker.examine("examine", filename)
131122
finally:
132123
await worker.close()
133124
if use_repository:

Diff for: ‎artiq/master/log.py

+1-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging.handlers
33

44
from artiq.protocols.sync_struct import Notifier
5-
from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter
5+
from artiq.protocols.logging import SourceFilter
66

77

88
class LogBuffer:
@@ -28,14 +28,6 @@ def emit(self, record):
2828
self.log_buffer.log(record.levelno, record.source, record.created,
2929
part)
3030

31-
32-
def log_worker(rid, filename, message):
33-
level, name, message = parse_log_message(message)
34-
log_with_name(name, level, message,
35-
extra={"source": "worker({},{})".format(rid, filename)})
36-
log_worker.worker_pass_runinfo = True
37-
38-
3931
def log_args(parser):
4032
group = parser.add_argument_group("logging")
4133
group.add_argument("-v", "--verbose", default=0, action="count",

Diff for: ‎artiq/master/worker.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from functools import partial
99

1010
from artiq.protocols import pipe_ipc, pyon
11+
from artiq.protocols.logging import LogParser
1112
from artiq.tools import asyncio_wait_or_cancel
1213

1314

@@ -72,6 +73,9 @@ def watchdog_time(self):
7273
else:
7374
return None
7475

76+
def _get_log_source(self):
77+
return "worker({},{})".format(self.rid, self.filename)
78+
7579
async def _create_process(self, log_level):
7680
await self.io_lock.acquire()
7781
try:
@@ -80,7 +84,14 @@ async def _create_process(self, log_level):
8084
self.ipc = pipe_ipc.AsyncioParentComm()
8185
await self.ipc.create_subprocess(
8286
sys.executable, "-m", "artiq.master.worker_impl",
83-
self.ipc.get_address(), str(log_level))
87+
self.ipc.get_address(), str(log_level),
88+
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
89+
asyncio.ensure_future(
90+
LogParser(self._get_log_source).stream_task(
91+
self.ipc.process.stdout))
92+
asyncio.ensure_future(
93+
LogParser(self._get_log_source).stream_task(
94+
self.ipc.process.stderr))
8495
finally:
8596
self.io_lock.release()
8697

@@ -94,7 +105,7 @@ async def close(self, term_timeout=1.0):
94105
await self.io_lock.acquire()
95106
try:
96107
if self.ipc is None:
97-
# Note the %s - self.rid can be None
108+
# Note the %s - self.rid can be None or a user string
98109
logger.debug("worker was not created (RID %s)", self.rid)
99110
return
100111
if self.ipc.process.returncode is not None:
@@ -192,8 +203,6 @@ async def _handle_worker_requests(self):
192203
func = self.register_experiment
193204
else:
194205
func = self.handlers[action]
195-
if getattr(func, "worker_pass_runinfo", False):
196-
func = partial(func, self.rid, self.filename)
197206
try:
198207
data = func(*obj["args"], **obj["kwargs"])
199208
reply = {"status": "ok", "data": data}
@@ -265,7 +274,10 @@ async def write_results(self, timeout=15.0):
265274
await self._worker_action({"action": "write_results"},
266275
timeout)
267276

268-
async def examine(self, file, timeout=20.0):
277+
async def examine(self, rid, file, timeout=20.0):
278+
self.rid = rid
279+
self.filename = os.path.basename(file)
280+
269281
await self._create_process(logging.WARNING)
270282
r = dict()
271283
def register(class_name, name, arginfo):

Diff for: ‎artiq/protocols/logging.py

+23
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,29 @@ def parse_log_message(msg):
3636
return logging.INFO, "print", msg
3737

3838

39+
class LogParser:
40+
def __init__(self, source_cb):
41+
self.source_cb = source_cb
42+
43+
def line_input(self, msg):
44+
level, name, message = parse_log_message(msg)
45+
log_with_name(name, level, message,
46+
extra={"source": self.source_cb()})
47+
48+
async def stream_task(self, stream):
49+
while True:
50+
try:
51+
entry = (await stream.readline())
52+
if not entry:
53+
break
54+
self.line_input(entry[:-1].decode())
55+
except:
56+
logger.debug("exception in log forwarding", exc_info=True)
57+
break
58+
logger.debug("stopped log forwarding of stream %s of %s",
59+
stream, self.source_cb())
60+
61+
3962
_init_string = b"ARTIQ logging\n"
4063

4164

Diff for: ‎artiq/test/worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _run_experiment(class_name):
7171
"arguments": dict()
7272
}
7373
loop = asyncio.get_event_loop()
74-
worker = Worker(handlers={"log": lambda message: None})
74+
worker = Worker({})
7575
loop.run_until_complete(_call_worker(worker, expid))
7676

7777

0 commit comments

Comments
 (0)
Please sign in to comment.