Skip to content

Instantly share code, notes, and snippets.

@ganwell

ganwell/topijul Secret

Last active February 9, 2021 00:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ganwell/d15cc230936d7c8bc00dc7f96b32d7aa to your computer and use it in GitHub Desktop.
Save ganwell/d15cc230936d7c8bc00dc7f96b32d7aa to your computer and use it in GitHub Desktop.
topijul
#!/usr/bin/env python3
import sys
from datetime import datetime
from itertools import islice
from os import environ, utime
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError
from subprocess import run as subrun
from time import sleep, time
# TODO
#
# * Get rid of the workarounds and hacks
# * Make it a package: pijul-tools
# * Add click
# * Add toml to temporarily set name, full_name, email
# * Add --continue to skip the base and make it easier to continue
# * Call it pijul-git
# * Add files post_condition
# * If normal repos do not cause stat_line_diff != stat_line_show remove stat_line_show
batch = dict(environ)
batch["VISUAL"] = "/bin/true"
track_count = "-l10000"
# HACK HACK HACK, db locking in pijul is broken
def run(*args, **kwargs):
proc = subrun(*args, **kwargs)
path = Path(".pijul/pristine/db_lock").absolute()
subrun(["pkill", "-f", f"pijul lock {path}"])
return proc
# HACK HACK HACK
def chunked_iterable(iterable, size):
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
break
yield chunk
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def checkout(rev):
run(["git", "checkout", "-q", rev], check=True)
def clean():
run(["git", "clean", "-xdfq", "-e", ".pijul"], check=True)
def git_ls_files():
res = []
for file_ in (
run(["git", "ls-files"], check=True, stdout=PIPE)
.stdout.decode("UTF-8")
.splitlines()
):
res.append(file_.strip())
return res
def pijul_ls_files():
res = []
for file_ in (
run(["pijul", "ls"], check=True, stdout=PIPE)
.stdout.decode("UTF-8")
.splitlines()
):
if not Path(file_).is_dir():
res.append(file_.strip())
return res
def post_condition_files():
git_reset()
pijul_reset()
git = set(git_ls_files())
pijul = set(pijul_ls_files())
if git != pijul:
eprint(f"git and pijul track a different set of files: {git ^ pijul}")
sys.exit(1)
restore()
def post_condition_diff():
lines = (
run(
["git", "-c", "core.fileMode=false", "status", "-s"],
check=True,
stdout=PIPE,
)
.stdout.decode("UTF-8")
.splitlines()
)
res = []
for line in lines:
nline = line.strip()
nline = nline.strip("?")
nline = nline.strip("/")
nline = nline.strip()
if nline != ".pijul":
res.append(line)
if res:
res = "\n".join(res)
eprint(f"There are untracked changes:\n\n{res}")
sys.exit(1)
def post_condition_record(out, err):
if "Nothing to record" in err:
patch = get_patch()
if patch:
eprint(f"Nothing to record, but there is a patch\n\n{patch}")
sys.exit(1)
else:
if not out:
eprint("Unknown error")
sys.exit(1)
def git_reset():
run(["git", "reset"], check=True)
def pijul_reset():
run(["pijul", "reset"], check=True)
def stat(prev, rev):
lines = (
run(
[
"git",
"diff",
"--name-status",
"--oneline",
track_count,
f"{prev}..{rev}",
],
check=True,
stdout=PIPE,
)
.stdout.decode("UTF-8")
.splitlines()
)
stat_line = "no diff"
try:
stat_line = (
run(
["git", "diff", "--stat", "--oneline", f"{prev}..{rev}"],
check=True,
stdout=PIPE,
)
.stdout.splitlines()[-1]
.decode("UTF-8")
)
except IndexError:
pass
files = []
renames = []
for line in lines:
_, _, file_ = line.partition("\t")
file_ = file_.strip()
a, _, b = file_.partition("\t")
found = [Path(file_).absolute()]
if b:
a = Path(a.strip())
b = Path(b.strip())
renames.append((a, b))
found = [a.absolute(), b.absolute()]
for file_ in found:
files.append(file_)
return stat_line, files, renames
def restore():
run(["git", "checkout", "--no-overlay", "-q", "."], check=True)
def add(files):
for chunk in chunked_iterable(files, 100):
run(["pijul", "add", "-f"] + chunk, check=True, stdout=PIPE, stderr=PIPE)
def add_recursive():
run(["pijul", "add", "-r"], check=True)
def diff():
run(["pijul", "diff", "--short", track_count], check=True, stdout=DEVNULL)
def record(log, author, timestamp):
res = run(
[
"pijul",
"record",
"--all",
"--timestamp",
timestamp,
"--author",
author,
"--message",
log,
],
check=True,
env=batch,
stdout=PIPE,
stderr=PIPE,
)
return res.stdout.strip().decode("UTF-8"), res.stderr.strip().decode("UTF-8")
def reinit():
run(["rm", "-rf", ".pijul"], check=True)
run(["pijul", "init"], check=True)
def get_patch():
lines = (
run(["git", "show", "--patch", "--oneline"], check=True, stdout=PIPE)
.stdout.decode("UTF-8")
.splitlines()[1:]
)
res = []
header = True
for line in lines:
if header:
ignore = False
if not line.strip():
header = False
continue
if line.startswith("diff --git "):
ignore = True
elif line.startswith("old mode "):
ignore = True
elif line.startswith("new mode "):
ignore = True
if not ignore:
res.append(line.strip())
else:
res.append(line.strip())
return "\n".join(res).strip()
def get_base():
return (
run(["git", "rev-list", "--all"], check=True, stdout=PIPE)
.stdout.splitlines()[-1]
.decode("UTF-8")
)
def show():
return run(["git", "show", "-s"], check=True, stdout=PIPE).stdout.decode(
"UTF-8", errors="ignore"
)
def get_tag(res, tag, line):
_, found, field = line.partition(f"{tag}:")
if found:
res[tag.lower()] = field.strip()
return True
return False
def parse_date(date):
return datetime.strptime(" ".join(date.split(" ")[:-1]), "%a %b %d %H:%M:%S %Y")
def parse_log(log):
out = []
res = {}
for line in log.splitlines():
found = False
for tag in ("Date", "Author"):
if get_tag(res, tag, line):
found = True
if not found:
out.append(line.strip())
return "\n".join(out), res["author"], parse_date(res["date"])
def rev_list(branch, base):
return (
run(
["git", "rev-list", "--first-parent", f"{base}..{branch}"],
check=True,
stdout=PIPE,
)
.stdout.decode("UTF-8")
.splitlines()
) + [base]
def rename(a, b):
a.parent.mkdir(parents=True, exist_ok=True)
b.parent.mkdir(parents=True, exist_ok=True)
b.rename(a)
try:
run(["pijul", "mv", a, b], check=True)
return True
except CalledProcessError:
return False
def force_reset():
git_reset()
clean()
restore()
def track_rename(files, renames):
rename_fail = False
for a, b in renames:
files.append(a.absolute())
files.append(b.absolute())
if not rename(a, b):
rename_fail = True
if rename_fail:
eprint("Could not track all renames, this is not an error")
force_reset()
def get_args():
arg_len = len(sys.argv)
branch = None
base = None
if arg_len > 1:
branch = sys.argv[1]
if arg_len > 2:
base = sys.argv[2]
if not branch:
branch = "origin/master"
if not base:
base = get_base()
return branch, base
class Runner:
def __init__(self, revs):
self.recover = False
self.revs = revs
self.wr = Workaround()
self.here = Path(".").absolute()
def run(self):
prev = self.revs.pop()
rev = self.revs.pop()
checkout(rev)
add(git_ls_files())
while self.revs:
if self.step(prev, rev):
prev = rev
rev = self.revs.pop()
self.step(prev, rev)
def prepare(self, prev, rev):
checkout(rev)
stat_line, files, renames = stat(prev, rev)
print(f"{stat_line}, {len(renames)} renames")
self.wr.update(files)
log = show()
log, author, date = parse_log(log)
timestamp = str(int(date.timestamp()))
if renames:
track_rename(files, renames)
files = [str(file_.relative_to(self.here)) for file_ in files if file_.exists()]
return files, log, author, timestamp
def handle_error(self, rev, hash_, e):
cmd = []
for arg in e.cmd:
if "\n" in arg or " " in arg or "\t" in arg:
cmd.append(f'"{arg}"')
else:
cmd.append(arg)
eprint(f"{rev} -> {hash_}")
eprint(f"Error, trying to recover\ncmd: {' '.join(cmd)}\ncode: {e.returncode}")
if e.stdout:
eprint(f"out: {e.stdout.decode('UTF-8')}")
if e.stderr:
eprint(f"err: {e.stderr.decode('UTF-8')}")
sleep(1)
try:
add_recursive()
except CalledProcessError:
pass
if self.recover:
raise
self.recover = True
def posts(self, out, err):
if not self.recover:
post_condition_record(out, err)
post_condition_files()
post_condition_diff()
def step(self, prev, rev):
hash_ = None
try:
files, log, author, timestamp = self.prepare(prev, rev)
add(files)
out, err = record(log, author, timestamp)
_, _, hash_ = out.partition("Hash:")
hash_ = hash_.strip()
except CalledProcessError as e:
self.handle_error(rev, hash_, e)
return False
print(f"{rev} -> {hash_}")
try:
self.posts(out, err)
except CalledProcessError:
sleep(1)
self.posts(out, err)
# run(["pijul", "fork", rev], check=True)
self.recover = False
return True
# Pijul can't detect changes if we don't change the timestamp
class Workaround:
def __init__(self):
self.ts = 0
def increment(self):
new_ts = max(int(time()), self.ts) + 1
assert new_ts > self.ts
self.ts = new_ts
def update(self, files):
self.increment()
for file_ in files:
if file_.exists():
utime(file_, (self.ts, self.ts))
def main():
branch, base = get_args()
revs = rev_list(branch, base)
force_reset()
runner = Runner(revs)
runner.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment