Skip to content

Commit 7ed8fe5

Browse files
committedAug 7, 2015
Git support
1 parent 968760d commit 7ed8fe5

File tree

11 files changed

+165
-41
lines changed

11 files changed

+165
-41
lines changed
 

Diff for: ‎artiq/frontend/artiq_client.py

+19-6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ def get_argparser():
4242
parser_add.add_argument("-f", "--flush", default=False, action="store_true",
4343
help="flush the pipeline before preparing "
4444
"the experiment")
45+
parser_add.add_argument("-R", "--repository", default=False,
46+
action="store_true",
47+
help="use the experiment repository")
48+
parser_add.add_argument("-r", "--revision", default=None,
49+
help="use a specific repository revision "
50+
"(defaults to head, ignored without -R)")
4551
parser_add.add_argument("-c", "--class-name", default=None,
4652
help="name of the class to run")
4753
parser_add.add_argument("file",
@@ -81,8 +87,8 @@ def get_argparser():
8187
"what",
8288
help="select object to show: schedule/devices/parameters")
8389

84-
parser_scan_repository = subparsers.add_parser(
85-
"scan-repository", help="rescan repository")
90+
subparsers.add_parser("scan-repository",
91+
help="trigger a repository rescan")
8692

8793
return parser
8894

@@ -107,6 +113,8 @@ def _action_submit(remote, args):
107113
"class_name": args.class_name,
108114
"arguments": arguments,
109115
}
116+
if args.repository:
117+
expid["repo_rev"] = args.revision
110118
if args.timed is None:
111119
due_date = None
112120
else:
@@ -148,19 +156,24 @@ def _show_schedule(schedule):
148156
x[1]["due_date"] or 0,
149157
x[0]))
150158
table = PrettyTable(["RID", "Pipeline", " Status ", "Prio",
151-
"Due date", "File", "Class name"])
159+
"Due date", "Revision", "File", "Class name"])
152160
for rid, v in l:
153161
row = [rid, v["pipeline"], v["status"], v["priority"]]
154162
if v["due_date"] is None:
155163
row.append("")
156164
else:
157165
row.append(time.strftime("%m/%d %H:%M:%S",
158166
time.localtime(v["due_date"])))
159-
row.append(v["expid"]["file"])
160-
if v["expid"]["class_name"] is None:
167+
expid = v["expid"]
168+
if "repo_rev" in expid:
169+
row.append(expid["repo_rev"])
170+
else:
171+
row.append("Outside repo.")
172+
row.append(expid["file"])
173+
if expid["class_name"] is None:
161174
row.append("")
162175
else:
163-
row.append(v["expid"]["class_name"])
176+
row.append(expid["class_name"])
164177
table.add_row(row)
165178
print(table)
166179
else:

Diff for: ‎artiq/frontend/artiq_master.py

+16-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from artiq.protocols.file_db import FlatFileDB
1111
from artiq.master.scheduler import Scheduler
1212
from artiq.master.worker_db import get_last_rid
13-
from artiq.master.repository import Repository
13+
from artiq.master.repository import FilesystemBackend, GitBackend, Repository
1414
from artiq.tools import verbosity_args, init_logger
1515

1616

@@ -26,6 +26,13 @@ def get_argparser():
2626
group.add_argument(
2727
"--port-control", default=3251, type=int,
2828
help="TCP port to listen to for control (default: %(default)d)")
29+
group = parser.add_argument_group("repository")
30+
group.add_argument(
31+
"-g", "--git", default=False, action="store_true",
32+
help="use the Git repository backend")
33+
group.add_argument(
34+
"-r", "--repository", default="repository",
35+
help="path to the repository (default: '%(default)s')")
2936
verbosity_args(parser)
3037
return parser
3138

@@ -57,21 +64,25 @@ def main():
5764
rtr = Notifier(dict())
5865
log = Log(1000)
5966

67+
if args.git:
68+
repo_backend = GitBackend(args.repository)
69+
else:
70+
repo_backend = FilesystemBackend(args.repository)
71+
repository = Repository(repo_backend, log.log)
72+
repository.scan_async()
73+
6074
worker_handlers = {
6175
"get_device": ddb.get,
6276
"get_parameter": pdb.get,
6377
"set_parameter": pdb.set,
6478
"update_rt_results": lambda mod: process_mod(rtr, mod),
6579
"log": log.log
6680
}
67-
scheduler = Scheduler(get_last_rid() + 1, worker_handlers)
81+
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
6882
worker_handlers["scheduler_submit"] = scheduler.submit
6983
scheduler.start()
7084
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
7185

72-
repository = Repository(log.log)
73-
repository.scan_async()
74-
7586
server_control = Server({
7687
"master_ddb": ddb,
7788
"master_pdb": pdb,

Diff for: ‎artiq/gui/explorer.py

+1
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def init_explist_model(self, init):
256256
def submit(self, pipeline_name, file, class_name, arguments,
257257
priority, due_date, flush):
258258
expid = {
259+
"repo_rev": None,
259260
"file": file,
260261
"class_name": class_name,
261262
"arguments": arguments,

Diff for: ‎artiq/gui/schedule.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class _ScheduleModel(DictSyncModel):
1212
def __init__(self, parent, init):
1313
DictSyncModel.__init__(self,
1414
["RID", "Pipeline", "Status", "Prio", "Due date",
15-
"File", "Class name"],
15+
"Revision", "File", "Class name"],
1616
parent, init)
1717

1818
def sort_key(self, k, v):
@@ -35,8 +35,14 @@ def convert(self, k, v, column):
3535
return time.strftime("%m/%d %H:%M:%S",
3636
time.localtime(v["due_date"]))
3737
elif column == 5:
38-
return v["expid"]["file"]
38+
expid = v["expid"]
39+
if "repo_rev" in expid:
40+
return expid["repo_rev"]
41+
else:
42+
return "Outside repo."
3943
elif column == 6:
44+
return v["expid"]["file"]
45+
elif column == 7:
4046
if v["expid"]["class_name"] is None:
4147
return ""
4248
else:

Diff for: ‎artiq/master/repository.py

+75-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
import os
3+
import tempfile
4+
import shutil
25
import logging
3-
import asyncio
46

57
from artiq.protocols.sync_struct import Notifier
68
from artiq.master.worker import Worker
@@ -10,15 +12,14 @@
1012

1113

1214
@asyncio.coroutine
13-
def _scan_experiments(log):
15+
def _scan_experiments(wd, log):
1416
r = dict()
15-
for f in os.listdir("repository"):
17+
for f in os.listdir(wd):
1618
if f.endswith(".py"):
1719
try:
18-
full_name = os.path.join("repository", f)
1920
worker = Worker({"log": lambda message: log("scan", message)})
2021
try:
21-
description = yield from worker.examine(full_name)
22+
description = yield from worker.examine(os.path.join(wd, f))
2223
finally:
2324
yield from worker.close()
2425
for class_name, class_desc in description.items():
@@ -32,7 +33,7 @@ def _scan_experiments(log):
3233
name = basename + str(i)
3334
i += 1
3435
entry = {
35-
"file": full_name,
36+
"file": f,
3637
"class_name": class_name,
3738
"arguments": arguments
3839
}
@@ -52,19 +53,84 @@ def _sync_explist(target, source):
5253

5354

5455
class Repository:
55-
def __init__(self, log_fn):
56+
def __init__(self, backend, log_fn):
57+
self.backend = backend
58+
self.log_fn = log_fn
59+
60+
self.head_rev = self.backend.get_head_rev()
61+
self.backend.request_rev(self.head_rev)
5662
self.explist = Notifier(dict())
63+
5764
self._scanning = False
58-
self.log_fn = log_fn
5965

6066
@asyncio.coroutine
6167
def scan(self):
6268
if self._scanning:
6369
return
6470
self._scanning = True
65-
new_explist = yield from _scan_experiments(self.log_fn)
71+
72+
new_head_rev = self.backend.get_head_rev()
73+
wd = self.backend.request_rev(new_head_rev)
74+
self.backend.release_rev(self.head_rev)
75+
self.head_rev = new_head_rev
76+
new_explist = yield from _scan_experiments(wd, self.log_fn)
77+
6678
_sync_explist(self.explist, new_explist)
6779
self._scanning = False
6880

6981
def scan_async(self):
7082
asyncio.async(self.scan())
83+
84+
85+
class FilesystemBackend:
86+
def __init__(self, root):
87+
self.root = os.path.abspath(root)
88+
89+
def get_head_rev(self):
90+
return "N/A"
91+
92+
def request_rev(self, rev):
93+
return self.root
94+
95+
def release_rev(self, rev):
96+
pass
97+
98+
99+
class _GitCheckout:
100+
def __init__(self, git, rev):
101+
self.path = tempfile.mkdtemp()
102+
git.checkout_tree(git.get(rev), directory=self.path)
103+
self.ref_count = 1
104+
logger.info("checked out revision %s into %s", rev, self.path)
105+
106+
def dispose(self):
107+
logger.info("disposing of checkout in folder %s", self.path)
108+
shutil.rmtree(self.path)
109+
110+
111+
class GitBackend:
112+
def __init__(self, root):
113+
# lazy import - make dependency optional
114+
import pygit2
115+
116+
self.git = pygit2.Repository(root)
117+
self.checkouts = dict()
118+
119+
def get_head_rev(self):
120+
return str(self.git.head.target)
121+
122+
def request_rev(self, rev):
123+
if rev in self.checkouts:
124+
co = self.checkouts[rev]
125+
co.ref_count += 1
126+
else:
127+
co = _GitCheckout(self.git, rev)
128+
self.checkouts[rev] = co
129+
return co.path
130+
131+
def release_rev(self, rev):
132+
co = self.checkouts[rev]
133+
co.ref_count -= 1
134+
if not co.ref_count:
135+
co.dispose()
136+
del self.checkouts[rev]

Diff for: ‎artiq/master/scheduler.py

+25-9
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ def worker_method(self, *args, **kwargs):
4747

4848
class Run:
4949
def __init__(self, rid, pipeline_name,
50-
expid, priority, due_date, flush,
50+
wd, expid, priority, due_date, flush,
5151
worker_handlers, notifier):
5252
# called through pool
5353
self.rid = rid
5454
self.pipeline_name = pipeline_name
55+
self.wd = wd
5556
self.expid = expid
5657
self.priority = priority
5758
self.due_date = due_date
@@ -103,7 +104,8 @@ def close(self):
103104

104105
@asyncio.coroutine
105106
def build(self):
106-
yield from self._build(self.rid, self.pipeline_name, self.expid,
107+
yield from self._build(self.rid, self.pipeline_name,
108+
self.wd, self.expid,
107109
self.priority)
108110

109111
prepare = _mk_worker_method("prepare")
@@ -124,18 +126,26 @@ def get(self):
124126

125127

126128
class RunPool:
127-
def __init__(self, ridc, worker_handlers, notifier):
129+
def __init__(self, ridc, worker_handlers, notifier, repo_backend):
128130
self.runs = dict()
129131
self.submitted_cb = None
130132

131133
self._ridc = ridc
132134
self._worker_handlers = worker_handlers
133135
self._notifier = notifier
136+
self._repo_backend = repo_backend
134137

135138
def submit(self, expid, priority, due_date, flush, pipeline_name):
139+
# mutates expid to insert head repository revision if None
136140
# called through scheduler
137141
rid = self._ridc.get()
138-
run = Run(rid, pipeline_name, expid, priority, due_date, flush,
142+
if "repo_rev" in expid:
143+
if expid["repo_rev"] is None:
144+
expid["repo_rev"] = self._repo_backend.get_head_rev()
145+
wd = self._repo_backend.request_rev(expid["repo_rev"])
146+
else:
147+
wd = None
148+
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
139149
self._worker_handlers, self._notifier)
140150
self.runs[rid] = run
141151
if self.submitted_cb is not None:
@@ -147,7 +157,10 @@ def delete(self, rid):
147157
# called through deleter
148158
if rid not in self.runs:
149159
return
150-
yield from self.runs[rid].close()
160+
run = self.runs[rid]
161+
yield from run.close()
162+
if "repo_rev" in run.expid:
163+
self._repo_backend.release_rev(run.expid["repo_rev"])
151164
del self.runs[rid]
152165

153166

@@ -280,12 +293,12 @@ def _do(self):
280293

281294

282295
class Pipeline:
283-
def __init__(self, ridc, deleter, worker_handlers, notifier):
296+
def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend):
284297
flush_tracker = WaitSet()
285298
def delete_cb(rid):
286299
deleter.delete(rid)
287300
flush_tracker.discard(rid)
288-
self.pool = RunPool(ridc, worker_handlers, notifier)
301+
self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend)
289302
self._prepare = PrepareStage(flush_tracker, delete_cb,
290303
self.pool, asyncio.Queue(maxsize=1))
291304
self._run = RunStage(delete_cb,
@@ -348,11 +361,12 @@ def _do(self):
348361

349362

350363
class Scheduler:
351-
def __init__(self, next_rid, worker_handlers):
364+
def __init__(self, next_rid, worker_handlers, repo_backend):
352365
self.notifier = Notifier(dict())
353366

354367
self._pipelines = dict()
355368
self._worker_handlers = worker_handlers
369+
self._repo_backend = repo_backend
356370
self._terminated = False
357371

358372
self._ridc = RIDCounter(next_rid)
@@ -374,14 +388,16 @@ def stop(self):
374388
logger.warning("some pipelines were not garbage-collected")
375389

376390
def submit(self, pipeline_name, expid, priority, due_date, flush):
391+
# mutates expid to insert head repository revision if None
377392
if self._terminated:
378393
return
379394
try:
380395
pipeline = self._pipelines[pipeline_name]
381396
except KeyError:
382397
logger.debug("creating pipeline '%s'", pipeline_name)
383398
pipeline = Pipeline(self._ridc, self._deleter,
384-
self._worker_handlers, self.notifier)
399+
self._worker_handlers, self.notifier,
400+
self._repo_backend)
385401
self._pipelines[pipeline_name] = pipeline
386402
pipeline.start()
387403
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)

Diff for: ‎artiq/master/worker.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,14 @@ def _worker_action(self, obj, timeout=None):
209209
return completed
210210

211211
@asyncio.coroutine
212-
def build(self, rid, pipeline_name, expid, priority, timeout=15.0):
212+
def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0):
213213
self.rid = rid
214214
yield from self._create_process()
215215
yield from self._worker_action(
216216
{"action": "build",
217217
"rid": rid,
218218
"pipeline_name": pipeline_name,
219+
"wd": wd,
219220
"expid": expid,
220221
"priority": priority},
221222
timeout)

Diff for: ‎artiq/master/worker_impl.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import sys
22
import time
3+
import os
34

45
from artiq.protocols import pyon
56
from artiq.tools import file_import
@@ -44,8 +45,6 @@ def parent_action(*args):
4445
return parent_action
4546

4647

47-
48-
4948
class LogForwarder:
5049
def __init__(self):
5150
self.buffer = ""
@@ -175,7 +174,12 @@ def main():
175174
start_time = time.localtime()
176175
rid = obj["rid"]
177176
expid = obj["expid"]
178-
exp = get_exp(expid["file"], expid["class_name"])
177+
if obj["wd"] is not None:
178+
# Using repository
179+
expf = os.path.join(obj["wd"], expid["file"])
180+
else:
181+
expf = expid["file"]
182+
exp = get_exp(expf, expid["class_name"])
179183
dmgr.virtual_devices["scheduler"].set_run_info(
180184
obj["pipeline_name"], expid, obj["priority"])
181185
exp_inst = exp(dmgr, ParentPDB, rdb,
@@ -194,6 +198,11 @@ def main():
194198
f = get_hdf5_output(start_time, rid, exp.__name__)
195199
try:
196200
rdb.write_hdf5(f)
201+
if "repo_rev" in expid:
202+
rr = expid["repo_rev"]
203+
dtype = "S{}".format(len(rr))
204+
dataset = f.create_dataset("repo_rev", (), dtype)
205+
dataset[()] = rr.encode()
197206
finally:
198207
f.close()
199208
put_object({"action": "completed"})

Diff for: ‎artiq/test/scheduler.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def setUp(self):
6767

6868
def test_steps(self):
6969
loop = self.loop
70-
scheduler = Scheduler(0, _handlers)
70+
scheduler = Scheduler(0, _handlers, None)
7171
expid = _get_expid("EmptyExperiment")
7272

7373
expect = _get_basic_steps(1, expid)
@@ -102,7 +102,7 @@ def notify(mod):
102102

103103
def test_pause(self):
104104
loop = self.loop
105-
scheduler = Scheduler(0, _handlers)
105+
scheduler = Scheduler(0, _handlers, None)
106106
expid_bg = _get_expid("BackgroundExperiment")
107107
expid = _get_expid("EmptyExperiment")
108108

@@ -133,7 +133,7 @@ def notify(mod):
133133

134134
def test_flush(self):
135135
loop = self.loop
136-
scheduler = Scheduler(0, _handlers)
136+
scheduler = Scheduler(0, _handlers, None)
137137
expid = _get_expid("EmptyExperiment")
138138

139139
expect = _get_basic_steps(1, expid, 1, True)

Diff for: ‎artiq/test/worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def run(self):
3838
@asyncio.coroutine
3939
def _call_worker(worker, expid):
4040
try:
41-
yield from worker.build(0, "main", expid, 0)
41+
yield from worker.build(0, "main", None, expid, 0)
4242
yield from worker.prepare()
4343
yield from worker.run()
4444
yield from worker.analyze()

Diff for: ‎setup.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import os
66

7+
78
if sys.version_info[:3] < (3, 4, 3):
89
raise Exception("You need at least Python 3.4.3 to run ARTIQ")
910

@@ -20,7 +21,7 @@ def run(self):
2021
requirements = [
2122
"sphinx", "sphinx-argparse", "pyserial", "numpy", "scipy",
2223
"python-dateutil", "prettytable", "h5py", "pydaqmx", "pyelftools",
23-
"quamash", "pyqtgraph", "llvmlite_artiq"
24+
"quamash", "pyqtgraph", "llvmlite_artiq", "pygit2"
2425
]
2526

2627
scripts = [
@@ -63,5 +64,5 @@ def run(self):
6364
entry_points={
6465
"console_scripts": scripts,
6566
},
66-
cmdclass={"push_doc":PushDocCommand}
67+
cmdclass={"push_doc": PushDocCommand}
6768
)

0 commit comments

Comments
 (0)
Please sign in to comment.