Skip to content

Commit

Permalink
Git support
Browse files Browse the repository at this point in the history
  • Loading branch information
sbourdeauducq committed Aug 7, 2015
1 parent 968760d commit 7ed8fe5
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 41 deletions.
25 changes: 19 additions & 6 deletions artiq/frontend/artiq_client.py
Expand Up @@ -42,6 +42,12 @@ def get_argparser():
parser_add.add_argument("-f", "--flush", default=False, action="store_true",
help="flush the pipeline before preparing "
"the experiment")
parser_add.add_argument("-R", "--repository", default=False,
action="store_true",
help="use the experiment repository")
parser_add.add_argument("-r", "--revision", default=None,
help="use a specific repository revision "
"(defaults to head, ignored without -R)")
parser_add.add_argument("-c", "--class-name", default=None,
help="name of the class to run")
parser_add.add_argument("file",
Expand Down Expand Up @@ -81,8 +87,8 @@ def get_argparser():
"what",
help="select object to show: schedule/devices/parameters")

parser_scan_repository = subparsers.add_parser(
"scan-repository", help="rescan repository")
subparsers.add_parser("scan-repository",
help="trigger a repository rescan")

return parser

Expand All @@ -107,6 +113,8 @@ def _action_submit(remote, args):
"class_name": args.class_name,
"arguments": arguments,
}
if args.repository:
expid["repo_rev"] = args.revision
if args.timed is None:
due_date = None
else:
Expand Down Expand Up @@ -148,19 +156,24 @@ def _show_schedule(schedule):
x[1]["due_date"] or 0,
x[0]))
table = PrettyTable(["RID", "Pipeline", " Status ", "Prio",
"Due date", "File", "Class name"])
"Due date", "Revision", "File", "Class name"])
for rid, v in l:
row = [rid, v["pipeline"], v["status"], v["priority"]]
if v["due_date"] is None:
row.append("")
else:
row.append(time.strftime("%m/%d %H:%M:%S",
time.localtime(v["due_date"])))
row.append(v["expid"]["file"])
if v["expid"]["class_name"] is None:
expid = v["expid"]
if "repo_rev" in expid:
row.append(expid["repo_rev"])
else:
row.append("Outside repo.")
row.append(expid["file"])
if expid["class_name"] is None:
row.append("")
else:
row.append(v["expid"]["class_name"])
row.append(expid["class_name"])
table.add_row(row)
print(table)
else:
Expand Down
21 changes: 16 additions & 5 deletions artiq/frontend/artiq_master.py
Expand Up @@ -10,7 +10,7 @@
from artiq.protocols.file_db import FlatFileDB
from artiq.master.scheduler import Scheduler
from artiq.master.worker_db import get_last_rid
from artiq.master.repository import Repository
from artiq.master.repository import FilesystemBackend, GitBackend, Repository
from artiq.tools import verbosity_args, init_logger


Expand All @@ -26,6 +26,13 @@ def get_argparser():
group.add_argument(
"--port-control", default=3251, type=int,
help="TCP port to listen to for control (default: %(default)d)")
group = parser.add_argument_group("repository")
group.add_argument(
"-g", "--git", default=False, action="store_true",
help="use the Git repository backend")
group.add_argument(
"-r", "--repository", default="repository",
help="path to the repository (default: '%(default)s')")
verbosity_args(parser)
return parser

Expand Down Expand Up @@ -57,21 +64,25 @@ def main():
rtr = Notifier(dict())
log = Log(1000)

if args.git:
repo_backend = GitBackend(args.repository)
else:
repo_backend = FilesystemBackend(args.repository)
repository = Repository(repo_backend, log.log)
repository.scan_async()

worker_handlers = {
"get_device": ddb.get,
"get_parameter": pdb.get,
"set_parameter": pdb.set,
"update_rt_results": lambda mod: process_mod(rtr, mod),
"log": log.log
}
scheduler = Scheduler(get_last_rid() + 1, worker_handlers)
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
worker_handlers["scheduler_submit"] = scheduler.submit
scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

repository = Repository(log.log)
repository.scan_async()

server_control = Server({
"master_ddb": ddb,
"master_pdb": pdb,
Expand Down
1 change: 1 addition & 0 deletions artiq/gui/explorer.py
Expand Up @@ -256,6 +256,7 @@ def init_explist_model(self, init):
def submit(self, pipeline_name, file, class_name, arguments,
priority, due_date, flush):
expid = {
"repo_rev": None,
"file": file,
"class_name": class_name,
"arguments": arguments,
Expand Down
10 changes: 8 additions & 2 deletions artiq/gui/schedule.py
Expand Up @@ -12,7 +12,7 @@ class _ScheduleModel(DictSyncModel):
def __init__(self, parent, init):
DictSyncModel.__init__(self,
["RID", "Pipeline", "Status", "Prio", "Due date",
"File", "Class name"],
"Revision", "File", "Class name"],
parent, init)

def sort_key(self, k, v):
Expand All @@ -35,8 +35,14 @@ def convert(self, k, v, column):
return time.strftime("%m/%d %H:%M:%S",
time.localtime(v["due_date"]))
elif column == 5:
return v["expid"]["file"]
expid = v["expid"]
if "repo_rev" in expid:
return expid["repo_rev"]
else:
return "Outside repo."
elif column == 6:
return v["expid"]["file"]
elif column == 7:
if v["expid"]["class_name"] is None:
return ""
else:
Expand Down
84 changes: 75 additions & 9 deletions artiq/master/repository.py
@@ -1,6 +1,8 @@
import asyncio
import os
import tempfile
import shutil
import logging
import asyncio

from artiq.protocols.sync_struct import Notifier
from artiq.master.worker import Worker
Expand All @@ -10,15 +12,14 @@


@asyncio.coroutine
def _scan_experiments(log):
def _scan_experiments(wd, log):
r = dict()
for f in os.listdir("repository"):
for f in os.listdir(wd):
if f.endswith(".py"):
try:
full_name = os.path.join("repository", f)
worker = Worker({"log": lambda message: log("scan", message)})
try:
description = yield from worker.examine(full_name)
description = yield from worker.examine(os.path.join(wd, f))
finally:
yield from worker.close()
for class_name, class_desc in description.items():
Expand All @@ -32,7 +33,7 @@ def _scan_experiments(log):
name = basename + str(i)
i += 1
entry = {
"file": full_name,
"file": f,
"class_name": class_name,
"arguments": arguments
}
Expand All @@ -52,19 +53,84 @@ def _sync_explist(target, source):


class Repository:
def __init__(self, log_fn):
def __init__(self, backend, log_fn):
self.backend = backend
self.log_fn = log_fn

self.head_rev = self.backend.get_head_rev()
self.backend.request_rev(self.head_rev)
self.explist = Notifier(dict())

self._scanning = False
self.log_fn = log_fn

@asyncio.coroutine
def scan(self):
if self._scanning:
return
self._scanning = True
new_explist = yield from _scan_experiments(self.log_fn)

new_head_rev = self.backend.get_head_rev()
wd = self.backend.request_rev(new_head_rev)
self.backend.release_rev(self.head_rev)
self.head_rev = new_head_rev
new_explist = yield from _scan_experiments(wd, self.log_fn)

_sync_explist(self.explist, new_explist)
self._scanning = False

def scan_async(self):
asyncio.async(self.scan())


class FilesystemBackend:
def __init__(self, root):
self.root = os.path.abspath(root)

def get_head_rev(self):
return "N/A"

def request_rev(self, rev):
return self.root

def release_rev(self, rev):
pass


class _GitCheckout:
def __init__(self, git, rev):
self.path = tempfile.mkdtemp()
git.checkout_tree(git.get(rev), directory=self.path)
self.ref_count = 1
logger.info("checked out revision %s into %s", rev, self.path)

def dispose(self):
logger.info("disposing of checkout in folder %s", self.path)
shutil.rmtree(self.path)


class GitBackend:
def __init__(self, root):
# lazy import - make dependency optional
import pygit2

self.git = pygit2.Repository(root)
self.checkouts = dict()

def get_head_rev(self):
return str(self.git.head.target)

def request_rev(self, rev):
if rev in self.checkouts:
co = self.checkouts[rev]
co.ref_count += 1
else:
co = _GitCheckout(self.git, rev)
self.checkouts[rev] = co
return co.path

def release_rev(self, rev):
co = self.checkouts[rev]
co.ref_count -= 1
if not co.ref_count:
co.dispose()
del self.checkouts[rev]
34 changes: 25 additions & 9 deletions artiq/master/scheduler.py
Expand Up @@ -47,11 +47,12 @@ def worker_method(self, *args, **kwargs):

class Run:
def __init__(self, rid, pipeline_name,
expid, priority, due_date, flush,
wd, expid, priority, due_date, flush,
worker_handlers, notifier):
# called through pool
self.rid = rid
self.pipeline_name = pipeline_name
self.wd = wd
self.expid = expid
self.priority = priority
self.due_date = due_date
Expand Down Expand Up @@ -103,7 +104,8 @@ def close(self):

@asyncio.coroutine
def build(self):
yield from self._build(self.rid, self.pipeline_name, self.expid,
yield from self._build(self.rid, self.pipeline_name,
self.wd, self.expid,
self.priority)

prepare = _mk_worker_method("prepare")
Expand All @@ -124,18 +126,26 @@ def get(self):


class RunPool:
def __init__(self, ridc, worker_handlers, notifier):
def __init__(self, ridc, worker_handlers, notifier, repo_backend):
self.runs = dict()
self.submitted_cb = None

self._ridc = ridc
self._worker_handlers = worker_handlers
self._notifier = notifier
self._repo_backend = repo_backend

def submit(self, expid, priority, due_date, flush, pipeline_name):
# mutates expid to insert head repository revision if None
# called through scheduler
rid = self._ridc.get()
run = Run(rid, pipeline_name, expid, priority, due_date, flush,
if "repo_rev" in expid:
if expid["repo_rev"] is None:
expid["repo_rev"] = self._repo_backend.get_head_rev()
wd = self._repo_backend.request_rev(expid["repo_rev"])
else:
wd = None
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self._worker_handlers, self._notifier)
self.runs[rid] = run
if self.submitted_cb is not None:
Expand All @@ -147,7 +157,10 @@ def delete(self, rid):
# called through deleter
if rid not in self.runs:
return
yield from self.runs[rid].close()
run = self.runs[rid]
yield from run.close()
if "repo_rev" in run.expid:
self._repo_backend.release_rev(run.expid["repo_rev"])
del self.runs[rid]


Expand Down Expand Up @@ -280,12 +293,12 @@ def _do(self):


class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend):
flush_tracker = WaitSet()
def delete_cb(rid):
deleter.delete(rid)
flush_tracker.discard(rid)
self.pool = RunPool(ridc, worker_handlers, notifier)
self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend)
self._prepare = PrepareStage(flush_tracker, delete_cb,
self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(delete_cb,
Expand Down Expand Up @@ -348,11 +361,12 @@ def _do(self):


class Scheduler:
def __init__(self, next_rid, worker_handlers):
def __init__(self, next_rid, worker_handlers, repo_backend):
self.notifier = Notifier(dict())

self._pipelines = dict()
self._worker_handlers = worker_handlers
self._repo_backend = repo_backend
self._terminated = False

self._ridc = RIDCounter(next_rid)
Expand All @@ -374,14 +388,16 @@ def stop(self):
logger.warning("some pipelines were not garbage-collected")

def submit(self, pipeline_name, expid, priority, due_date, flush):
# mutates expid to insert head repository revision if None
if self._terminated:
return
try:
pipeline = self._pipelines[pipeline_name]
except KeyError:
logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier)
self._worker_handlers, self.notifier,
self._repo_backend)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
Expand Down

0 comments on commit 7ed8fe5

Please sign in to comment.