|
2 | 2 | import os
|
3 | 3 | import tempfile
|
4 | 4 | import shutil
|
| 5 | +import time |
5 | 6 | import logging
|
6 | 7 |
|
7 | 8 | from artiq.protocols.sync_struct import Notifier
|
|
13 | 14 | logger = logging.getLogger(__name__)
|
14 | 15 |
|
15 | 16 |
|
16 |
| -async def _get_repository_entries(entry_dict, |
17 |
| - root, filename, worker_handlers): |
18 |
| - worker = Worker(worker_handlers) |
19 |
| - try: |
20 |
| - description = await worker.examine("scan", os.path.join(root, filename)) |
21 |
| - except: |
22 |
| - log_worker_exception() |
23 |
| - raise |
24 |
| - finally: |
25 |
| - await worker.close() |
26 |
| - for class_name, class_desc in description.items(): |
27 |
| - name = class_desc["name"] |
28 |
| - arginfo = class_desc["arginfo"] |
29 |
| - if "/" in name: |
30 |
| - logger.warning("Character '/' is not allowed in experiment " |
31 |
| - "name (%s)", name) |
32 |
| - name = name.replace("/", "_") |
33 |
| - if name in entry_dict: |
34 |
| - basename = name |
35 |
| - i = 1 |
36 |
| - while name in entry_dict: |
37 |
| - name = basename + str(i) |
38 |
| - i += 1 |
39 |
| - logger.warning("Duplicate experiment name: '%s'\n" |
40 |
| - "Renaming class '%s' in '%s' to '%s'", |
41 |
| - basename, class_name, filename, name) |
42 |
| - entry = { |
43 |
| - "file": filename, |
44 |
| - "class_name": class_name, |
45 |
| - "arginfo": arginfo |
46 |
| - } |
47 |
| - entry_dict[name] = entry |
48 |
| - |
49 |
| - |
50 |
| -async def _scan_experiments(root, worker_handlers, subdir=""): |
51 |
| - entry_dict = dict() |
52 |
| - for de in os.scandir(os.path.join(root, subdir)): |
53 |
| - if de.name.startswith("."): |
54 |
| - continue |
55 |
| - if de.is_file() and de.name.endswith(".py"): |
56 |
| - filename = os.path.join(subdir, de.name) |
57 |
| - try: |
58 |
| - await _get_repository_entries( |
59 |
| - entry_dict, root, filename, worker_handlers) |
60 |
| - except Exception as exc: |
61 |
| - logger.warning("Skipping file '%s'", filename, |
62 |
| - exc_info=not isinstance(exc, WorkerInternalException)) |
63 |
| - if de.is_dir(): |
64 |
| - subentries = await _scan_experiments( |
65 |
| - root, worker_handlers, |
66 |
| - os.path.join(subdir, de.name)) |
67 |
| - entries = {de.name + "/" + k: v for k, v in subentries.items()} |
68 |
| - entry_dict.update(entries) |
69 |
| - return entry_dict |
| 17 | +class _RepoScanner: |
| 18 | + def __init__(self, worker_handlers): |
| 19 | + self.worker_handlers = worker_handlers |
| 20 | + self.worker = None |
| 21 | + |
| 22 | + async def process_file(self, entry_dict, root, filename): |
| 23 | + logger.debug("processing file %s %s", root, filename) |
| 24 | + try: |
| 25 | + description = await self.worker.examine( |
| 26 | + "scan", os.path.join(root, filename)) |
| 27 | + except: |
| 28 | + log_worker_exception() |
| 29 | + raise |
| 30 | + for class_name, class_desc in description.items(): |
| 31 | + name = class_desc["name"] |
| 32 | + arginfo = class_desc["arginfo"] |
| 33 | + if "/" in name: |
| 34 | + logger.warning("Character '/' is not allowed in experiment " |
| 35 | + "name (%s)", name) |
| 36 | + name = name.replace("/", "_") |
| 37 | + if name in entry_dict: |
| 38 | + basename = name |
| 39 | + i = 1 |
| 40 | + while name in entry_dict: |
| 41 | + name = basename + str(i) |
| 42 | + i += 1 |
| 43 | + logger.warning("Duplicate experiment name: '%s'\n" |
| 44 | + "Renaming class '%s' in '%s' to '%s'", |
| 45 | + basename, class_name, filename, name) |
| 46 | + entry = { |
| 47 | + "file": filename, |
| 48 | + "class_name": class_name, |
| 49 | + "arginfo": arginfo |
| 50 | + } |
| 51 | + entry_dict[name] = entry |
| 52 | + |
| 53 | + async def _scan(self, root, subdir=""): |
| 54 | + entry_dict = dict() |
| 55 | + for de in os.scandir(os.path.join(root, subdir)): |
| 56 | + if de.name.startswith("."): |
| 57 | + continue |
| 58 | + if de.is_file() and de.name.endswith(".py"): |
| 59 | + filename = os.path.join(subdir, de.name) |
| 60 | + try: |
| 61 | + await self.process_file(entry_dict, root, filename) |
| 62 | + except Exception as exc: |
| 63 | + logger.warning("Skipping file '%s'", filename, |
| 64 | + exc_info=not isinstance(exc, WorkerInternalException)) |
| 65 | + # restart worker |
| 66 | + await self.worker.close() |
| 67 | + self.worker = Worker(self.worker_handlers) |
| 68 | + if de.is_dir(): |
| 69 | + subentries = await self._scan( |
| 70 | + root, os.path.join(subdir, de.name)) |
| 71 | + entries = {de.name + "/" + k: v for k, v in subentries.items()} |
| 72 | + entry_dict.update(entries) |
| 73 | + return entry_dict |
| 74 | + |
| 75 | + async def scan(self, root): |
| 76 | + self.worker = Worker(self.worker_handlers) |
| 77 | + try: |
| 78 | + r = await self._scan(root) |
| 79 | + finally: |
| 80 | + await self.worker.close() |
| 81 | + return r |
70 | 82 |
|
71 | 83 |
|
72 | 84 | def _sync_explist(target, source):
|
@@ -109,7 +121,9 @@ async def scan_repository(self, new_cur_rev=None):
|
109 | 121 | self.repo_backend.release_rev(self.cur_rev)
|
110 | 122 | self.cur_rev = new_cur_rev
|
111 | 123 | self.status["cur_rev"] = new_cur_rev
|
112 |
| - new_explist = await _scan_experiments(wd, self.worker_handlers) |
| 124 | + t1 = time.monotonic() |
| 125 | + new_explist = await _RepoScanner(self.worker_handlers).scan(wd) |
| 126 | + logger.info("repository scan took %d seconds", time.monotonic()-t1) |
113 | 127 |
|
114 | 128 | _sync_explist(self.explist, new_explist)
|
115 | 129 | finally:
|
|
0 commit comments