-
-
Save ganwell/d15cc230936d7c8bc00dc7f96b32d7aa to your computer and use it in GitHub Desktop.
topijul
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
from datetime import datetime | |
from itertools import islice | |
from os import environ, utime | |
from pathlib import Path | |
from subprocess import DEVNULL, PIPE, CalledProcessError | |
from subprocess import run as subrun | |
from time import sleep, time | |
# TODO | |
# | |
# * Get rid of the workarounds and hacks | |
# * Make it a package: pijul-tools | |
# * Add click | |
# * Add toml to temporarily set name, full_name, email | |
# * Add --continue to skip the base and make it easier to continue | |
# * Call it pijul-git | |
# * Add files post_condition | |
# * If normal repos do not cause stat_line_diff != stat_line_show remove stat_line_show | |
batch = dict(environ) | |
batch["VISUAL"] = "/bin/true" | |
track_count = "-l10000" | |
# HACK HACK HACK, db locking in pijul is broken | |
def run(*args, **kwargs): | |
proc = subrun(*args, **kwargs) | |
path = Path(".pijul/pristine/db_lock").absolute() | |
subrun(["pkill", "-f", f"pijul lock {path}"]) | |
return proc | |
# HACK HACK HACK | |
def chunked_iterable(iterable, size): | |
it = iter(iterable) | |
while True: | |
chunk = list(islice(it, size)) | |
if not chunk: | |
break | |
yield chunk | |
def eprint(*args, **kwargs): | |
print(*args, file=sys.stderr, **kwargs) | |
def checkout(rev): | |
run(["git", "checkout", "-q", rev], check=True) | |
def clean(): | |
run(["git", "clean", "-xdfq", "-e", ".pijul"], check=True) | |
def git_ls_files(): | |
res = [] | |
for file_ in ( | |
run(["git", "ls-files"], check=True, stdout=PIPE) | |
.stdout.decode("UTF-8") | |
.splitlines() | |
): | |
res.append(file_.strip()) | |
return res | |
def pijul_ls_files(): | |
res = [] | |
for file_ in ( | |
run(["pijul", "ls"], check=True, stdout=PIPE) | |
.stdout.decode("UTF-8") | |
.splitlines() | |
): | |
if not Path(file_).is_dir(): | |
res.append(file_.strip()) | |
return res | |
def post_condition_files(): | |
git_reset() | |
pijul_reset() | |
git = set(git_ls_files()) | |
pijul = set(pijul_ls_files()) | |
if git != pijul: | |
eprint(f"git and pijul track a different set of files: {git ^ pijul}") | |
sys.exit(1) | |
restore() | |
def post_condition_diff(): | |
lines = ( | |
run( | |
["git", "-c", "core.fileMode=false", "status", "-s"], | |
check=True, | |
stdout=PIPE, | |
) | |
.stdout.decode("UTF-8") | |
.splitlines() | |
) | |
res = [] | |
for line in lines: | |
nline = line.strip() | |
nline = nline.strip("?") | |
nline = nline.strip("/") | |
nline = nline.strip() | |
if nline != ".pijul": | |
res.append(line) | |
if res: | |
res = "\n".join(res) | |
eprint(f"There are untracked changes:\n\n{res}") | |
sys.exit(1) | |
def post_condition_record(out, err): | |
if "Nothing to record" in err: | |
patch = get_patch() | |
if patch: | |
eprint(f"Nothing to record, but there is a patch\n\n{patch}") | |
sys.exit(1) | |
else: | |
if not out: | |
eprint("Unknown error") | |
sys.exit(1) | |
def git_reset(): | |
run(["git", "reset"], check=True) | |
def pijul_reset(): | |
run(["pijul", "reset"], check=True) | |
def stat(prev, rev): | |
lines = ( | |
run( | |
[ | |
"git", | |
"diff", | |
"--name-status", | |
"--oneline", | |
track_count, | |
f"{prev}..{rev}", | |
], | |
check=True, | |
stdout=PIPE, | |
) | |
.stdout.decode("UTF-8") | |
.splitlines() | |
) | |
stat_line = "no diff" | |
try: | |
stat_line = ( | |
run( | |
["git", "diff", "--stat", "--oneline", f"{prev}..{rev}"], | |
check=True, | |
stdout=PIPE, | |
) | |
.stdout.splitlines()[-1] | |
.decode("UTF-8") | |
) | |
except IndexError: | |
pass | |
files = [] | |
renames = [] | |
for line in lines: | |
_, _, file_ = line.partition("\t") | |
file_ = file_.strip() | |
a, _, b = file_.partition("\t") | |
found = [Path(file_).absolute()] | |
if b: | |
a = Path(a.strip()) | |
b = Path(b.strip()) | |
renames.append((a, b)) | |
found = [a.absolute(), b.absolute()] | |
for file_ in found: | |
files.append(file_) | |
return stat_line, files, renames | |
def restore(): | |
run(["git", "checkout", "--no-overlay", "-q", "."], check=True) | |
def add(files): | |
for chunk in chunked_iterable(files, 100): | |
run(["pijul", "add", "-f"] + chunk, check=True, stdout=PIPE, stderr=PIPE) | |
def add_recursive(): | |
run(["pijul", "add", "-r"], check=True) | |
def diff(): | |
run(["pijul", "diff", "--short", track_count], check=True, stdout=DEVNULL) | |
def record(log, author, timestamp): | |
res = run( | |
[ | |
"pijul", | |
"record", | |
"--all", | |
"--timestamp", | |
timestamp, | |
"--author", | |
author, | |
"--message", | |
log, | |
], | |
check=True, | |
env=batch, | |
stdout=PIPE, | |
stderr=PIPE, | |
) | |
return res.stdout.strip().decode("UTF-8"), res.stderr.strip().decode("UTF-8") | |
def reinit(): | |
run(["rm", "-rf", ".pijul"], check=True) | |
run(["pijul", "init"], check=True) | |
def get_patch(): | |
lines = ( | |
run(["git", "show", "--patch", "--oneline"], check=True, stdout=PIPE) | |
.stdout.decode("UTF-8") | |
.splitlines()[1:] | |
) | |
res = [] | |
header = True | |
for line in lines: | |
if header: | |
ignore = False | |
if not line.strip(): | |
header = False | |
continue | |
if line.startswith("diff --git "): | |
ignore = True | |
elif line.startswith("old mode "): | |
ignore = True | |
elif line.startswith("new mode "): | |
ignore = True | |
if not ignore: | |
res.append(line.strip()) | |
else: | |
res.append(line.strip()) | |
return "\n".join(res).strip() | |
def get_base(): | |
return ( | |
run(["git", "rev-list", "--all"], check=True, stdout=PIPE) | |
.stdout.splitlines()[-1] | |
.decode("UTF-8") | |
) | |
def show(): | |
return run(["git", "show", "-s"], check=True, stdout=PIPE).stdout.decode( | |
"UTF-8", errors="ignore" | |
) | |
def get_tag(res, tag, line): | |
_, found, field = line.partition(f"{tag}:") | |
if found: | |
res[tag.lower()] = field.strip() | |
return True | |
return False | |
def parse_date(date): | |
return datetime.strptime(" ".join(date.split(" ")[:-1]), "%a %b %d %H:%M:%S %Y") | |
def parse_log(log): | |
out = [] | |
res = {} | |
for line in log.splitlines(): | |
found = False | |
for tag in ("Date", "Author"): | |
if get_tag(res, tag, line): | |
found = True | |
if not found: | |
out.append(line.strip()) | |
return "\n".join(out), res["author"], parse_date(res["date"]) | |
def rev_list(branch, base): | |
return ( | |
run( | |
["git", "rev-list", "--first-parent", f"{base}..{branch}"], | |
check=True, | |
stdout=PIPE, | |
) | |
.stdout.decode("UTF-8") | |
.splitlines() | |
) + [base] | |
def rename(a, b): | |
a.parent.mkdir(parents=True, exist_ok=True) | |
b.parent.mkdir(parents=True, exist_ok=True) | |
b.rename(a) | |
try: | |
run(["pijul", "mv", a, b], check=True) | |
return True | |
except CalledProcessError: | |
return False | |
def force_reset(): | |
git_reset() | |
clean() | |
restore() | |
def track_rename(files, renames): | |
rename_fail = False | |
for a, b in renames: | |
files.append(a.absolute()) | |
files.append(b.absolute()) | |
if not rename(a, b): | |
rename_fail = True | |
if rename_fail: | |
eprint("Could not track all renames, this is not an error") | |
force_reset() | |
def get_args(): | |
arg_len = len(sys.argv) | |
branch = None | |
base = None | |
if arg_len > 1: | |
branch = sys.argv[1] | |
if arg_len > 2: | |
base = sys.argv[2] | |
if not branch: | |
branch = "origin/master" | |
if not base: | |
base = get_base() | |
return branch, base | |
class Runner: | |
def __init__(self, revs): | |
self.recover = False | |
self.revs = revs | |
self.wr = Workaround() | |
self.here = Path(".").absolute() | |
def run(self): | |
prev = self.revs.pop() | |
rev = self.revs.pop() | |
checkout(rev) | |
add(git_ls_files()) | |
while self.revs: | |
if self.step(prev, rev): | |
prev = rev | |
rev = self.revs.pop() | |
self.step(prev, rev) | |
def prepare(self, prev, rev): | |
checkout(rev) | |
stat_line, files, renames = stat(prev, rev) | |
print(f"{stat_line}, {len(renames)} renames") | |
self.wr.update(files) | |
log = show() | |
log, author, date = parse_log(log) | |
timestamp = str(int(date.timestamp())) | |
if renames: | |
track_rename(files, renames) | |
files = [str(file_.relative_to(self.here)) for file_ in files if file_.exists()] | |
return files, log, author, timestamp | |
def handle_error(self, rev, hash_, e): | |
cmd = [] | |
for arg in e.cmd: | |
if "\n" in arg or " " in arg or "\t" in arg: | |
cmd.append(f'"{arg}"') | |
else: | |
cmd.append(arg) | |
eprint(f"{rev} -> {hash_}") | |
eprint(f"Error, trying to recover\ncmd: {' '.join(cmd)}\ncode: {e.returncode}") | |
if e.stdout: | |
eprint(f"out: {e.stdout.decode('UTF-8')}") | |
if e.stderr: | |
eprint(f"err: {e.stderr.decode('UTF-8')}") | |
sleep(1) | |
try: | |
add_recursive() | |
except CalledProcessError: | |
pass | |
if self.recover: | |
raise | |
self.recover = True | |
def posts(self, out, err): | |
if not self.recover: | |
post_condition_record(out, err) | |
post_condition_files() | |
post_condition_diff() | |
def step(self, prev, rev): | |
hash_ = None | |
try: | |
files, log, author, timestamp = self.prepare(prev, rev) | |
add(files) | |
out, err = record(log, author, timestamp) | |
_, _, hash_ = out.partition("Hash:") | |
hash_ = hash_.strip() | |
except CalledProcessError as e: | |
self.handle_error(rev, hash_, e) | |
return False | |
print(f"{rev} -> {hash_}") | |
try: | |
self.posts(out, err) | |
except CalledProcessError: | |
sleep(1) | |
self.posts(out, err) | |
# run(["pijul", "fork", rev], check=True) | |
self.recover = False | |
return True | |
# Pijul can't detect changes if we don't change the timestamp | |
class Workaround: | |
def __init__(self): | |
self.ts = 0 | |
def increment(self): | |
new_ts = max(int(time()), self.ts) + 1 | |
assert new_ts > self.ts | |
self.ts = new_ts | |
def update(self, files): | |
self.increment() | |
for file_ in files: | |
if file_.exists(): | |
utime(file_, (self.ts, self.ts)) | |
def main(): | |
branch, base = get_args() | |
revs = rev_list(branch, base) | |
force_reset() | |
runner = Runner(revs) | |
runner.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment