Last active
March 5, 2018 20:42
-
-
Save lauradoepker/7dcf78b5a6cca3a57f5efe54f6a2d228 to your computer and use it in GitHub Desktop.
Pruning script for seeded antibody clonal families
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
process_asr.py | |
purpose: Given an outputfile from one of the PHYLIP tools - `dnaml` or `dnapars` - produce an alignment (including ancestral sequences) and a newick tree (with matching internal node lables) | |
usage: dependency of prune.py | |
authors (in order of contribution): Chris Small, William DeWitt, Chris Warth | |
note: this is part of a larger package that is in development | |
dependencies: etetoolkit.org, biopython.org, | |
""" | |
from __future__ import print_function | |
import argparse | |
import ete3 | |
import csv | |
import re | |
import os | |
from warnings import warn | |
from collections import defaultdict | |
from Bio import SeqIO | |
from Bio.Seq import Seq | |
from Bio.SeqRecord import SeqRecord | |
# iterate over recognized sections in the phylip output file. | |
def sections(fh): | |
patterns = { | |
"parents": "\s+between\s+and\s+length", | |
("sequences", 'dnaml'): "\s*node\s+reconstructed\s+sequence", | |
('sequences', 'dnapars'): "from\s+to\s+any steps"} | |
patterns = {k: re.compile(v, re.IGNORECASE) for (k,v) in patterns.items()} | |
for line in fh: | |
for k, pat in patterns.items(): | |
if pat.match(line): | |
yield k | |
break | |
# iterate over entries in the distance section | |
def iter_edges(fh): | |
# 152 >naive2 0.01208 ( zero, 0.02525) ** | |
pat = re.compile("\s*(?P<parent>\w+)\s+(?P<child>[\w>_.-]+)\s+(?P<distance>\d+\.\d+)") | |
# drop the header underline | |
fh.next() | |
matches = 0 | |
for line in fh: | |
m = pat.match(line) | |
if m: | |
matches += 1 | |
yield (m.group("child"), m.group("parent"), float(m.group("distance"))) | |
# We only want to break at the very end of the block of matches; dnaml has an extra blank line between | |
# header and rows that dnapars doesn't | |
elif not matches: | |
continue | |
else: | |
break | |
# iterate over entries in the sequences section | |
def parse_seqdict(fh, mode='dnaml'): | |
# 152 sssssssssG AGGTGCAGCT GTTGGAGTCT GGGGGAGGCT TGGTACAGCC TGGGGGGTCC | |
seqs = defaultdict(str) | |
patterns = { | |
'dnaml': re.compile("^\s*(?P<id>[a-zA-Z0-9>_.-]*)\s+(?P<seq>[a-zA-Z \-]+)"), | |
'dnapars': re.compile("^\s*\S+\s+(?P<id>[a-zA-Z0-9>_.-]*)\s+(yes\s+|no\s+|maybe\s+)?(?P<seq>[a-zA-Z \-]+)")} | |
fh.next() | |
for line in fh: | |
m = patterns[mode].match(line) | |
if m: | |
seqs[m.group("id")] += m.group("seq").replace(" ", "") | |
elif line.rstrip() == '': | |
continue | |
else: | |
break | |
return seqs | |
# parse the dnaml output file and return data strictures containing a | |
# list biopython.SeqRecords and a dict containing adjacency | |
# relationships and distances between nodes. | |
def parse_outfile(outfile, seqmeta, seqname_mapping=None): | |
'''parse phylip outfile''' | |
name_map = seqname_mapping or {seq_id[0:10]: seq_id for seq_id in seqmeta} | |
if len(name_map) != len(seqmeta) and not seqname_mapping: | |
raise RuntimeError("Conflicting shortened names!") | |
# Keeping track of which names match up for validation and error handling | |
goods = set() | |
bads = set() | |
def full_name(x): | |
name = name_map.get(x) | |
if not name and not re.match('\d*$', x): | |
bads.add(x) | |
elif name: | |
goods.add(x) | |
return name or x | |
# Ugg... for compilation need to let python know that these will definely both be defined :-/ | |
sequences, parents = [], {} | |
with open(outfile, 'rU') as fh: | |
for sect in sections(fh): | |
if sect == 'parents': | |
parents = { full_name(parent): (full_name(child), dist) for parent, child, dist in iter_edges(fh) } | |
elif sect[0] == 'sequences': | |
d = parse_seqdict(fh, sect[1]) | |
sequences = [ SeqRecord(Seq(seq), id=full_name(seq_id), description="") | |
for seq_id, seq in d.items() ] | |
else: | |
raise RuntimeError("unrecognized phylip setion = {}".format(sect)) | |
if 'naive' in bads: | |
warn("Uh... naive in bad sequences?") | |
bads.remove('naive') | |
if bads: | |
print("good sequences:", sorted(goods)) | |
print("bad sequences:", sorted(bads)) | |
print("name_map misses:") | |
for k, v in sorted(name_map.items()): | |
if k not in goods: | |
print(" ", k, ":", v) | |
raise RuntimeError("mismatched sequence/node names") | |
# sanity check; a valid tree should have exactly one node that is parentless | |
if not len(parents) == len(sequences) - 1: | |
raise RuntimeError('invalid results attempting to parse {}: there are {} parentless sequences'.format(outfile, len(sequences) - len(parents))) | |
return sequences, parents | |
# build a tree from a set of sequences and an adjacency dict. | |
def build_tree(sequences, parents): | |
# build an ete tree | |
# first a dictionary of disconnected nodes | |
def mknode(r): | |
n = ete3.Tree( | |
name=r.id, | |
dist=parents[r.id][1] if r.id in parents else 0, | |
format=1) | |
n.add_features(seq=r) | |
return n | |
nodes = { r.id: mknode(r) for r in sequences } | |
# connect the nodes using the parent data | |
orphan_nodes = 0 | |
for r in sequences: | |
if r.id in parents: | |
nodes[parents[r.id][0]].add_child(nodes[r.id]) | |
else: | |
# node without parent becomes root | |
tree = nodes[r.id] | |
orphan_nodes += 1 | |
# there can only be one root | |
if orphan_nodes != 1: | |
raise RuntimeError("The tree is not properly rooted; expected a single root but there are {}.".format(orphan_nodes)) | |
return tree | |
def find_node(tree, pattern): | |
regex = re.compile(pattern).search | |
nodes = [ node for node in tree.traverse() for m in [regex(node.name)] if m] | |
if not nodes: | |
warn("Cannot find matching node; looking for name matching '{}'".format(pattern)) | |
return | |
else: | |
if len(nodes) > 1: | |
warn("multiple nodes found; using first one.\nfound: {}".format([n.name for n in nodes])) | |
return nodes[0] | |
# reroot the tree on node matching regex pattern. | |
# Usually this is used to root on the naive germline sequence with name 'naive' | |
def reroot_tree(tree, pattern='naive'): | |
# find all nodes matching pattern | |
node = find_node(tree, pattern) | |
if tree != node: | |
# In general this would be necessary, but we are actually assuming that naive has been set as an | |
# outgroup in dnaml, and if it hasn't, we want to raise an error, as below | |
#tree.set_outgroup(node) | |
# Raise error if naive isn't in root's children | |
if node not in tree.children: | |
raise ValueError("Naive node not set as outgroup; Check dnaml/asr run to make sure this is the case") | |
# This actually assumes the `not in` condition above, but we check as above for clarity | |
tree.remove_child(node) | |
node.add_child(tree) | |
tree.dist = node.dist | |
node.dist = 0 | |
tree = node | |
return tree | |
def seqname_mapping_arg(filename): | |
with open(filename, 'r') as fh: | |
return {row['new_id']: row['original_id'] for row in csv.DictReader(fh)} | |
def get_args(): | |
def seqmeta_input(fname): | |
with open(fname, 'r') as fhandle: | |
return dict((row['sequence'], row) for row in csv.DictReader(fhandle)) | |
def existing_file(fname): | |
""" | |
Argparse type for an existing file | |
""" | |
if not os.path.isfile(fname): | |
raise ValueError("Invalid file: " + str(fname)) | |
return fname | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
'phylip_outfile', type=existing_file, | |
help='dnaml outfile (verbose output with inferred ancestral sequences, option 5).') | |
parser.add_argument( | |
'seqmeta', type=seqmeta_input, | |
help="Sequence metadata file for annotating mut_freqs") | |
parser.add_argument( | |
'--seqname-mapping', | |
type=seqname_mapping_arg, | |
help="optional csv file translating original_id to new_id for phylip 10 char cludgery") | |
parser.add_argument( | |
'--outdir', default='.', | |
help="output directory where results are left. [default '%(default)s']") | |
parser.add_argument( | |
'--basename', help="basename of output files. [default 'basename(DNAML)']") | |
parser.add_argument( | |
'--seed', type=str, help="id of leaf [default 'seed']", default='seed') | |
return parser.parse_args() | |
def main(): | |
args = get_args() | |
# basename of outputfiles is taken from --basename if supplied, otherwise basename of DNAML. | |
basename = args.basename if args.basename else os.path.basename(args.phylip_outfile) | |
outbase = os.path.join(args.outdir, os.path.splitext(basename)[0]) | |
sequences, parents = parse_outfile(args.phylip_outfile, args.seqmeta, args.seqname_mapping) | |
if not sequences or not parents: | |
raise RuntimeError("No sequences were available; are you sure this is a dnaml output file?") | |
# write alignment to fasta | |
fname = outbase + '.fa' | |
with open(fname, "w") as fh: | |
SeqIO.write(sequences, fh, "fasta") | |
tree = build_tree(sequences, parents) | |
tree = reroot_tree(tree, 'naive') | |
# write newick file | |
fname = outbase + '.nwk' | |
tree.write(format=1, format_root_node=True, outfile=fname) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
prune.py | |
purpose: Return a reduced set of taxa based on either seed lineage selection ("pruning") or min ADCL selection ("trimming") | |
usage: Antibody (BF520.1) lineage reconstruction. This script pares down a list (fasta file input) of clonal antibody sequences to 100 sequences for further analysis by BEAST. | |
authors (in order of contribution): William DeWitt, Frederick A Matsen IV, Chris Small, Chris Warth, Laura Noges | |
note: this is part of a larger package that is in development | |
dependencies: etetoolkit.org, biopython.org, github.com/matsen/pplacer | |
""" | |
from ete3 import Tree | |
from process_asr import find_node, reroot_tree | |
import subprocess | |
import argparse | |
import sys | |
def seed_lineage_selection(args): | |
""" | |
Take the closest taxa on subtrees branching off the root (naive) to seed lineage. | |
E.g. given (((seed:1,(l3:1,l4:2)s2:1,(l5:3,l6:4)s3:1)b:1,(l1:1,l2:2)s1:1),naive:1); | |
/-seed | |
| | |
| /-l3 | |
/-|--| | |
| | \-l4 | |
| | | |
| | /-l5 | |
/-| \-| | |
| | \-l6 | |
| | | |
--| | /-l1 | |
| \-| | |
| \-l2 | |
| | |
\-naive | |
return (naive0, l3, l1, l5, seed) | |
We cycle through these subtrees from root to seed, always taking the | |
closest taxon to the root to seed lineage, until we have gotten the required | |
number of taxa. | |
""" | |
tree = args.tree | |
# Set args.n_keep to the min of requested value and the actual number of seqs (make sure to do this before | |
# rerooting, or len(tree) will have decremented...) | |
n_keep = min(args.n_keep, len(tree)) | |
# Reroot the tree on naive | |
naive_node = find_node(tree, args.naive) | |
tree.set_outgroup(naive_node) | |
tree = reroot_tree(tree, args.naive) | |
# Collect ids of nodes on the seed lineage. | |
seed_node = find_node(tree, args.seed) | |
if args.seed is 'seed' and seed_node is None: | |
seed_node = tree.get_farthest_leaf()[0] | |
distance_dict = {} | |
def distances(lineage_node, leaf): | |
if leaf not in distance_dict: | |
distance_dict[leaf] = lineage_node.get_distance(leaf) | |
return distance_dict[leaf] | |
# Iterate over seed lineage and find closest taxon from each branch, and | |
# repeat until we have n_keep leaf sequences. | |
leaves_to_keep = set(find_node(tree, n) for n in args.always_include if tree.get_leaves_by_name(n)) | |
# Extract the sequence of nodes on lineage from root to seed. | |
# Note: ete doc suggests get_ancestors() would include the seed node, but it doesn't. | |
# "Returns the list of all ancestor nodes from current node to the current tree root" | |
# Note this slicing (::-1) reverses the order, so we do indeed go from root to seed. | |
seed_lineage = seed_node.get_ancestors()[::-1] + [seed_node] | |
# We'll build a list of the subtrees off the seed lineage. | |
subtrees = [] | |
for i, lineage_node in enumerate(seed_lineage[:-1]): | |
for subtree in lineage_node.children: | |
# The subtree that continues down the seed lineage doesn't count. | |
if subtree != seed_lineage[i+1]: | |
subtrees.append(subtree) | |
# Repeatedly pass through this list of subtrees, grabbing the one | |
# closest leaf (to the seed lineage) from each, until we get how many we | |
# need. | |
while len(leaves_to_keep) < (n_keep - 1): # -1 because naive gets rerooted out, and we manually yield as below | |
for subtree in subtrees: | |
# Obtain all the leaves in this subtree that aren't already in leaves_to_keep. | |
leaves = [leaf for leaf in subtree.iter_leaves() if leaf not in leaves_to_keep] | |
if leaves: | |
# Add the leaf that's the closest to the seed lineages. | |
# Use subtree.up so that we are getting the distance from the | |
# node on the lineage from root to seed (rather than the root | |
# of the subtree). | |
leaves_to_keep.add(min(leaves, key=lambda leaf: distances(subtree.up, leaf))) | |
if len(leaves_to_keep) == n_keep - 1: | |
break | |
# Yield all the selected leaves (including naive and seed) | |
yield args.naive | |
for leaf in leaves_to_keep: | |
yield leaf.name | |
def min_adcl_selection(args): | |
""" | |
Minimize ADCL for a tree using pplacer suite. | |
""" | |
tipnames = [t.name for t in args.tree.iter_leaves()] | |
if len(tipnames) <= args.n_keep: | |
return tipnames | |
else: | |
# set up file for telling rppr min_adcl what sequences to always include (always include ids = aiids) | |
aiids_fn = args.output + '.aiids' | |
handle = file(aiids_fn, 'w') | |
lines = [line + "\n" for line in args.always_include] | |
handle.writelines(lines) | |
handle.flush() | |
handle.close() | |
# Set up the command for execution | |
command = "rppr min_adcl_tree --algorithm pam --leaves".split(" ") \ | |
+ [args.n_keep, "--always-include", args.output + '.aiids', args.tree_file] | |
sys.stderr.write("rppr command:\n") | |
sys.stderr.write(" ".join(str(x) for x in command) + "\n") | |
command = map(str, command) | |
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
output, stderr = proc.communicate() | |
# Print out any stderr for debugging; Should maybe also look for errors and fix | |
if stderr: | |
sys.stderr.write("rppr stderr:\n") | |
sys.stderr.write(stderr + "\n") | |
cut_names = [n for n in output.split("\n")] | |
keep_names = [x for x in tipnames if x not in cut_names] | |
if len(keep_names) > args.n_keep and len(cut_names) == 0: | |
raise Exception("cut names list empty and shouldn't be; perhaps the rppr subprocess failed?") | |
return keep_names | |
def tree_arg(tree_arg_value): | |
return Tree(tree_arg_value, format=1) | |
def get_args(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
'tree_file', | |
help='input newick tree file') | |
parser.add_argument( | |
'--strategy', choices=["min_adcl", "seed_lineage"], default="seed_lineage") | |
parser.add_argument( | |
'--naive', help='id of root [default \'naive\']', default='naive') | |
parser.add_argument( | |
'--seed', help='id of leaf [default \'seed\']') | |
parser.add_argument( | |
'--always-include', type=lambda x: x.split(','), help='comma separated list of ids to keep', | |
default=[]) | |
parser.add_argument( | |
'output', | |
help='file for output id list') | |
parser.add_argument( | |
'-n', '--n-keep', type=int, | |
help='number of sequences to keep [default: 100]', default=100) | |
args = parser.parse_args() | |
args.tree = tree_arg(args.tree_file) | |
leaf_names = set(args.tree.get_leaf_names()) | |
args.always_include = set( | |
filter(lambda leaf_name: leaf_name and leaf_name in leaf_names, | |
[args.naive, args.seed] + args.always_include)) | |
return args | |
def main(args): | |
selection_fn = seed_lineage_selection if args.strategy == "seed_lineage" else min_adcl_selection | |
out_handle = file(args.output, 'w') | |
for name in selection_fn(args): | |
# Writes to stdout | |
out_handle.write(name +"\n") | |
out_handle.close() | |
if __name__ == "__main__": | |
main(get_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment