Skip to content

Commit ad697d4

Browse files
committedSep 9, 2016
master: optimize repository scan, closes #546
1 parent f23ebc0 commit ad697d4

File tree

2 files changed

+71
-55
lines changed

2 files changed

+71
-55
lines changed
 

Diff for: ‎artiq/master/experiments.py

+69-55
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import tempfile
44
import shutil
5+
import time
56
import logging
67

78
from artiq.protocols.sync_struct import Notifier
@@ -13,60 +14,71 @@
1314
logger = logging.getLogger(__name__)
1415

1516

16-
async def _get_repository_entries(entry_dict,
17-
root, filename, worker_handlers):
18-
worker = Worker(worker_handlers)
19-
try:
20-
description = await worker.examine("scan", os.path.join(root, filename))
21-
except:
22-
log_worker_exception()
23-
raise
24-
finally:
25-
await worker.close()
26-
for class_name, class_desc in description.items():
27-
name = class_desc["name"]
28-
arginfo = class_desc["arginfo"]
29-
if "/" in name:
30-
logger.warning("Character '/' is not allowed in experiment "
31-
"name (%s)", name)
32-
name = name.replace("/", "_")
33-
if name in entry_dict:
34-
basename = name
35-
i = 1
36-
while name in entry_dict:
37-
name = basename + str(i)
38-
i += 1
39-
logger.warning("Duplicate experiment name: '%s'\n"
40-
"Renaming class '%s' in '%s' to '%s'",
41-
basename, class_name, filename, name)
42-
entry = {
43-
"file": filename,
44-
"class_name": class_name,
45-
"arginfo": arginfo
46-
}
47-
entry_dict[name] = entry
48-
49-
50-
async def _scan_experiments(root, worker_handlers, subdir=""):
51-
entry_dict = dict()
52-
for de in os.scandir(os.path.join(root, subdir)):
53-
if de.name.startswith("."):
54-
continue
55-
if de.is_file() and de.name.endswith(".py"):
56-
filename = os.path.join(subdir, de.name)
57-
try:
58-
await _get_repository_entries(
59-
entry_dict, root, filename, worker_handlers)
60-
except Exception as exc:
61-
logger.warning("Skipping file '%s'", filename,
62-
exc_info=not isinstance(exc, WorkerInternalException))
63-
if de.is_dir():
64-
subentries = await _scan_experiments(
65-
root, worker_handlers,
66-
os.path.join(subdir, de.name))
67-
entries = {de.name + "/" + k: v for k, v in subentries.items()}
68-
entry_dict.update(entries)
69-
return entry_dict
17+
class _RepoScanner:
18+
def __init__(self, worker_handlers):
19+
self.worker_handlers = worker_handlers
20+
self.worker = None
21+
22+
async def process_file(self, entry_dict, root, filename):
23+
logger.debug("processing file %s %s", root, filename)
24+
try:
25+
description = await self.worker.examine(
26+
"scan", os.path.join(root, filename))
27+
except:
28+
log_worker_exception()
29+
raise
30+
for class_name, class_desc in description.items():
31+
name = class_desc["name"]
32+
arginfo = class_desc["arginfo"]
33+
if "/" in name:
34+
logger.warning("Character '/' is not allowed in experiment "
35+
"name (%s)", name)
36+
name = name.replace("/", "_")
37+
if name in entry_dict:
38+
basename = name
39+
i = 1
40+
while name in entry_dict:
41+
name = basename + str(i)
42+
i += 1
43+
logger.warning("Duplicate experiment name: '%s'\n"
44+
"Renaming class '%s' in '%s' to '%s'",
45+
basename, class_name, filename, name)
46+
entry = {
47+
"file": filename,
48+
"class_name": class_name,
49+
"arginfo": arginfo
50+
}
51+
entry_dict[name] = entry
52+
53+
async def _scan(self, root, subdir=""):
54+
entry_dict = dict()
55+
for de in os.scandir(os.path.join(root, subdir)):
56+
if de.name.startswith("."):
57+
continue
58+
if de.is_file() and de.name.endswith(".py"):
59+
filename = os.path.join(subdir, de.name)
60+
try:
61+
await self.process_file(entry_dict, root, filename)
62+
except Exception as exc:
63+
logger.warning("Skipping file '%s'", filename,
64+
exc_info=not isinstance(exc, WorkerInternalException))
65+
# restart worker
66+
await self.worker.close()
67+
self.worker = Worker(self.worker_handlers)
68+
if de.is_dir():
69+
subentries = await self._scan(
70+
root, os.path.join(subdir, de.name))
71+
entries = {de.name + "/" + k: v for k, v in subentries.items()}
72+
entry_dict.update(entries)
73+
return entry_dict
74+
75+
async def scan(self, root):
76+
self.worker = Worker(self.worker_handlers)
77+
try:
78+
r = await self._scan(root)
79+
finally:
80+
await self.worker.close()
81+
return r
7082

7183

7284
def _sync_explist(target, source):
@@ -109,7 +121,9 @@ async def scan_repository(self, new_cur_rev=None):
109121
self.repo_backend.release_rev(self.cur_rev)
110122
self.cur_rev = new_cur_rev
111123
self.status["cur_rev"] = new_cur_rev
112-
new_explist = await _scan_experiments(wd, self.worker_handlers)
124+
t1 = time.monotonic()
125+
new_explist = await _RepoScanner(self.worker_handlers).scan(wd)
126+
logger.info("repository scan took %d seconds", time.monotonic()-t1)
113127

114128
_sync_explist(self.explist, new_explist)
115129
finally:

Diff for: ‎artiq/master/worker.py

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ def _get_log_source(self):
7676
return "worker({},{})".format(self.rid, self.filename)
7777

7878
async def _create_process(self, log_level):
79+
if self.ipc is not None:
80+
return # process already exists, recycle
7981
await self.io_lock.acquire()
8082
try:
8183
if self.closed.is_set():

0 commit comments

Comments
 (0)
Please sign in to comment.