Skip to content

Instantly share code, notes, and snippets.

@lauradoepker
Last active March 5, 2018 20:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lauradoepker/7dcf78b5a6cca3a57f5efe54f6a2d228 to your computer and use it in GitHub Desktop.
Save lauradoepker/7dcf78b5a6cca3a57f5efe54f6a2d228 to your computer and use it in GitHub Desktop.
Pruning script for seeded antibody clonal families
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
process_asr.py
purpose: Given an outputfile from one of the PHYLIP tools - `dnaml` or `dnapars` - produce an alignment (including ancestral sequences) and a newick tree (with matching internal node lables)
usage: dependency of prune.py
authors (in order of contribution): Chris Small, William DeWitt, Chris Warth
note: this is part of a larger package that is in development
dependencies: etetoolkit.org, biopython.org,
"""
from __future__ import print_function
import argparse
import ete3
import csv
import re
import os
from warnings import warn
from collections import defaultdict
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
# iterate over recognized sections in the phylip output file.
def sections(fh):
patterns = {
"parents": "\s+between\s+and\s+length",
("sequences", 'dnaml'): "\s*node\s+reconstructed\s+sequence",
('sequences', 'dnapars'): "from\s+to\s+any steps"}
patterns = {k: re.compile(v, re.IGNORECASE) for (k,v) in patterns.items()}
for line in fh:
for k, pat in patterns.items():
if pat.match(line):
yield k
break
# iterate over entries in the distance section
def iter_edges(fh):
# 152 >naive2 0.01208 ( zero, 0.02525) **
pat = re.compile("\s*(?P<parent>\w+)\s+(?P<child>[\w>_.-]+)\s+(?P<distance>\d+\.\d+)")
# drop the header underline
fh.next()
matches = 0
for line in fh:
m = pat.match(line)
if m:
matches += 1
yield (m.group("child"), m.group("parent"), float(m.group("distance")))
# We only want to break at the very end of the block of matches; dnaml has an extra blank line between
# header and rows that dnapars doesn't
elif not matches:
continue
else:
break
# iterate over entries in the sequences section
def parse_seqdict(fh, mode='dnaml'):
# 152 sssssssssG AGGTGCAGCT GTTGGAGTCT GGGGGAGGCT TGGTACAGCC TGGGGGGTCC
seqs = defaultdict(str)
patterns = {
'dnaml': re.compile("^\s*(?P<id>[a-zA-Z0-9>_.-]*)\s+(?P<seq>[a-zA-Z \-]+)"),
'dnapars': re.compile("^\s*\S+\s+(?P<id>[a-zA-Z0-9>_.-]*)\s+(yes\s+|no\s+|maybe\s+)?(?P<seq>[a-zA-Z \-]+)")}
fh.next()
for line in fh:
m = patterns[mode].match(line)
if m:
seqs[m.group("id")] += m.group("seq").replace(" ", "")
elif line.rstrip() == '':
continue
else:
break
return seqs
# parse the dnaml output file and return data strictures containing a
# list biopython.SeqRecords and a dict containing adjacency
# relationships and distances between nodes.
def parse_outfile(outfile, seqmeta, seqname_mapping=None):
'''parse phylip outfile'''
name_map = seqname_mapping or {seq_id[0:10]: seq_id for seq_id in seqmeta}
if len(name_map) != len(seqmeta) and not seqname_mapping:
raise RuntimeError("Conflicting shortened names!")
# Keeping track of which names match up for validation and error handling
goods = set()
bads = set()
def full_name(x):
name = name_map.get(x)
if not name and not re.match('\d*$', x):
bads.add(x)
elif name:
goods.add(x)
return name or x
# Ugg... for compilation need to let python know that these will definely both be defined :-/
sequences, parents = [], {}
with open(outfile, 'rU') as fh:
for sect in sections(fh):
if sect == 'parents':
parents = { full_name(parent): (full_name(child), dist) for parent, child, dist in iter_edges(fh) }
elif sect[0] == 'sequences':
d = parse_seqdict(fh, sect[1])
sequences = [ SeqRecord(Seq(seq), id=full_name(seq_id), description="")
for seq_id, seq in d.items() ]
else:
raise RuntimeError("unrecognized phylip setion = {}".format(sect))
if 'naive' in bads:
warn("Uh... naive in bad sequences?")
bads.remove('naive')
if bads:
print("good sequences:", sorted(goods))
print("bad sequences:", sorted(bads))
print("name_map misses:")
for k, v in sorted(name_map.items()):
if k not in goods:
print(" ", k, ":", v)
raise RuntimeError("mismatched sequence/node names")
# sanity check; a valid tree should have exactly one node that is parentless
if not len(parents) == len(sequences) - 1:
raise RuntimeError('invalid results attempting to parse {}: there are {} parentless sequences'.format(outfile, len(sequences) - len(parents)))
return sequences, parents
# build a tree from a set of sequences and an adjacency dict.
def build_tree(sequences, parents):
# build an ete tree
# first a dictionary of disconnected nodes
def mknode(r):
n = ete3.Tree(
name=r.id,
dist=parents[r.id][1] if r.id in parents else 0,
format=1)
n.add_features(seq=r)
return n
nodes = { r.id: mknode(r) for r in sequences }
# connect the nodes using the parent data
orphan_nodes = 0
for r in sequences:
if r.id in parents:
nodes[parents[r.id][0]].add_child(nodes[r.id])
else:
# node without parent becomes root
tree = nodes[r.id]
orphan_nodes += 1
# there can only be one root
if orphan_nodes != 1:
raise RuntimeError("The tree is not properly rooted; expected a single root but there are {}.".format(orphan_nodes))
return tree
def find_node(tree, pattern):
regex = re.compile(pattern).search
nodes = [ node for node in tree.traverse() for m in [regex(node.name)] if m]
if not nodes:
warn("Cannot find matching node; looking for name matching '{}'".format(pattern))
return
else:
if len(nodes) > 1:
warn("multiple nodes found; using first one.\nfound: {}".format([n.name for n in nodes]))
return nodes[0]
# reroot the tree on node matching regex pattern.
# Usually this is used to root on the naive germline sequence with name 'naive'
def reroot_tree(tree, pattern='naive'):
# find all nodes matching pattern
node = find_node(tree, pattern)
if tree != node:
# In general this would be necessary, but we are actually assuming that naive has been set as an
# outgroup in dnaml, and if it hasn't, we want to raise an error, as below
#tree.set_outgroup(node)
# Raise error if naive isn't in root's children
if node not in tree.children:
raise ValueError("Naive node not set as outgroup; Check dnaml/asr run to make sure this is the case")
# This actually assumes the `not in` condition above, but we check as above for clarity
tree.remove_child(node)
node.add_child(tree)
tree.dist = node.dist
node.dist = 0
tree = node
return tree
def seqname_mapping_arg(filename):
with open(filename, 'r') as fh:
return {row['new_id']: row['original_id'] for row in csv.DictReader(fh)}
def get_args():
def seqmeta_input(fname):
with open(fname, 'r') as fhandle:
return dict((row['sequence'], row) for row in csv.DictReader(fhandle))
def existing_file(fname):
"""
Argparse type for an existing file
"""
if not os.path.isfile(fname):
raise ValueError("Invalid file: " + str(fname))
return fname
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'phylip_outfile', type=existing_file,
help='dnaml outfile (verbose output with inferred ancestral sequences, option 5).')
parser.add_argument(
'seqmeta', type=seqmeta_input,
help="Sequence metadata file for annotating mut_freqs")
parser.add_argument(
'--seqname-mapping',
type=seqname_mapping_arg,
help="optional csv file translating original_id to new_id for phylip 10 char cludgery")
parser.add_argument(
'--outdir', default='.',
help="output directory where results are left. [default '%(default)s']")
parser.add_argument(
'--basename', help="basename of output files. [default 'basename(DNAML)']")
parser.add_argument(
'--seed', type=str, help="id of leaf [default 'seed']", default='seed')
return parser.parse_args()
def main():
args = get_args()
# basename of outputfiles is taken from --basename if supplied, otherwise basename of DNAML.
basename = args.basename if args.basename else os.path.basename(args.phylip_outfile)
outbase = os.path.join(args.outdir, os.path.splitext(basename)[0])
sequences, parents = parse_outfile(args.phylip_outfile, args.seqmeta, args.seqname_mapping)
if not sequences or not parents:
raise RuntimeError("No sequences were available; are you sure this is a dnaml output file?")
# write alignment to fasta
fname = outbase + '.fa'
with open(fname, "w") as fh:
SeqIO.write(sequences, fh, "fasta")
tree = build_tree(sequences, parents)
tree = reroot_tree(tree, 'naive')
# write newick file
fname = outbase + '.nwk'
tree.write(format=1, format_root_node=True, outfile=fname)
if __name__ == "__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
prune.py
purpose: Return a reduced set of taxa based on either seed lineage selection ("pruning") or min ADCL selection ("trimming")
usage: Antibody (BF520.1) lineage reconstruction. This script pares down a list (fasta file input) of clonal antibody sequences to 100 sequences for further analysis by BEAST.
authors (in order of contribution): William DeWitt, Frederick A Matsen IV, Chris Small, Chris Warth, Laura Noges
note: this is part of a larger package that is in development
dependencies: etetoolkit.org, biopython.org, github.com/matsen/pplacer
"""
from ete3 import Tree
from process_asr import find_node, reroot_tree
import subprocess
import argparse
import sys
def seed_lineage_selection(args):
"""
Take the closest taxa on subtrees branching off the root (naive) to seed lineage.
E.g. given (((seed:1,(l3:1,l4:2)s2:1,(l5:3,l6:4)s3:1)b:1,(l1:1,l2:2)s1:1),naive:1);
/-seed
|
| /-l3
/-|--|
| | \-l4
| |
| | /-l5
/-| \-|
| | \-l6
| |
--| | /-l1
| \-|
| \-l2
|
\-naive
return (naive0, l3, l1, l5, seed)
We cycle through these subtrees from root to seed, always taking the
closest taxon to the root to seed lineage, until we have gotten the required
number of taxa.
"""
tree = args.tree
# Set args.n_keep to the min of requested value and the actual number of seqs (make sure to do this before
# rerooting, or len(tree) will have decremented...)
n_keep = min(args.n_keep, len(tree))
# Reroot the tree on naive
naive_node = find_node(tree, args.naive)
tree.set_outgroup(naive_node)
tree = reroot_tree(tree, args.naive)
# Collect ids of nodes on the seed lineage.
seed_node = find_node(tree, args.seed)
if args.seed is 'seed' and seed_node is None:
seed_node = tree.get_farthest_leaf()[0]
distance_dict = {}
def distances(lineage_node, leaf):
if leaf not in distance_dict:
distance_dict[leaf] = lineage_node.get_distance(leaf)
return distance_dict[leaf]
# Iterate over seed lineage and find closest taxon from each branch, and
# repeat until we have n_keep leaf sequences.
leaves_to_keep = set(find_node(tree, n) for n in args.always_include if tree.get_leaves_by_name(n))
# Extract the sequence of nodes on lineage from root to seed.
# Note: ete doc suggests get_ancestors() would include the seed node, but it doesn't.
# "Returns the list of all ancestor nodes from current node to the current tree root"
# Note this slicing (::-1) reverses the order, so we do indeed go from root to seed.
seed_lineage = seed_node.get_ancestors()[::-1] + [seed_node]
# We'll build a list of the subtrees off the seed lineage.
subtrees = []
for i, lineage_node in enumerate(seed_lineage[:-1]):
for subtree in lineage_node.children:
# The subtree that continues down the seed lineage doesn't count.
if subtree != seed_lineage[i+1]:
subtrees.append(subtree)
# Repeatedly pass through this list of subtrees, grabbing the one
# closest leaf (to the seed lineage) from each, until we get how many we
# need.
while len(leaves_to_keep) < (n_keep - 1): # -1 because naive gets rerooted out, and we manually yield as below
for subtree in subtrees:
# Obtain all the leaves in this subtree that aren't already in leaves_to_keep.
leaves = [leaf for leaf in subtree.iter_leaves() if leaf not in leaves_to_keep]
if leaves:
# Add the leaf that's the closest to the seed lineages.
# Use subtree.up so that we are getting the distance from the
# node on the lineage from root to seed (rather than the root
# of the subtree).
leaves_to_keep.add(min(leaves, key=lambda leaf: distances(subtree.up, leaf)))
if len(leaves_to_keep) == n_keep - 1:
break
# Yield all the selected leaves (including naive and seed)
yield args.naive
for leaf in leaves_to_keep:
yield leaf.name
def min_adcl_selection(args):
"""
Minimize ADCL for a tree using pplacer suite.
"""
tipnames = [t.name for t in args.tree.iter_leaves()]
if len(tipnames) <= args.n_keep:
return tipnames
else:
# set up file for telling rppr min_adcl what sequences to always include (always include ids = aiids)
aiids_fn = args.output + '.aiids'
handle = file(aiids_fn, 'w')
lines = [line + "\n" for line in args.always_include]
handle.writelines(lines)
handle.flush()
handle.close()
# Set up the command for execution
command = "rppr min_adcl_tree --algorithm pam --leaves".split(" ") \
+ [args.n_keep, "--always-include", args.output + '.aiids', args.tree_file]
sys.stderr.write("rppr command:\n")
sys.stderr.write(" ".join(str(x) for x in command) + "\n")
command = map(str, command)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, stderr = proc.communicate()
# Print out any stderr for debugging; Should maybe also look for errors and fix
if stderr:
sys.stderr.write("rppr stderr:\n")
sys.stderr.write(stderr + "\n")
cut_names = [n for n in output.split("\n")]
keep_names = [x for x in tipnames if x not in cut_names]
if len(keep_names) > args.n_keep and len(cut_names) == 0:
raise Exception("cut names list empty and shouldn't be; perhaps the rppr subprocess failed?")
return keep_names
def tree_arg(tree_arg_value):
return Tree(tree_arg_value, format=1)
def get_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'tree_file',
help='input newick tree file')
parser.add_argument(
'--strategy', choices=["min_adcl", "seed_lineage"], default="seed_lineage")
parser.add_argument(
'--naive', help='id of root [default \'naive\']', default='naive')
parser.add_argument(
'--seed', help='id of leaf [default \'seed\']')
parser.add_argument(
'--always-include', type=lambda x: x.split(','), help='comma separated list of ids to keep',
default=[])
parser.add_argument(
'output',
help='file for output id list')
parser.add_argument(
'-n', '--n-keep', type=int,
help='number of sequences to keep [default: 100]', default=100)
args = parser.parse_args()
args.tree = tree_arg(args.tree_file)
leaf_names = set(args.tree.get_leaf_names())
args.always_include = set(
filter(lambda leaf_name: leaf_name and leaf_name in leaf_names,
[args.naive, args.seed] + args.always_include))
return args
def main(args):
selection_fn = seed_lineage_selection if args.strategy == "seed_lineage" else min_adcl_selection
out_handle = file(args.output, 'w')
for name in selection_fn(args):
# Writes to stdout
out_handle.write(name +"\n")
out_handle.close()
if __name__ == "__main__":
main(get_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment