1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
|
#!/usr/bin/env python3
"""
Created in Jun 2016
@author: Harald Voehringer
"""
from itertools import groupby
from collections import namedtuple, defaultdict
import textwrap
DEBUG = False
FASTA = namedtuple('FASTA', ['header', 'seq'])
PDBDATA = namedtuple('PDBDATA', ['entry', 'res', 'rfr', 'comp', 'met'])
def as_pairs(fasta):
""" Reads in fasta headers as defined """
for header, group in groupby(fasta, lambda x: x[0] == '>'):
if header:
line = next(group)
identifier = line.split(' ')[0].split('>')[1]
header_line = str(line.strip())
else:
sequence = ''.join(line.strip() for line in group).upper()
data = FASTA(header_line, sequence)
yield identifier, data
def read_fasta(fasta_file):
""" Reads in fasta sequences with help of the as_pairs function."""
fasta_dic = dict()
duplicate = 0
with open(fasta_file) as fasta:
for title, sequence in as_pairs(fasta):
if title not in fasta_dic:
fasta_dic[title] = sequence
else:
duplicate +=1
if duplicate != 0:
print ('! Warning found duplicate {num} fasta.'.format(
num = duplicate))
return fasta_dic
def read_fasta_annotations(fname):
""" Reads the information about resolution, R-free and completness which
be outputed by cif2fasta."""
annotations = dict()
with open(fname) as fh:
for line in fh:
if len(line) > 0 and line[0] == '#':
continue
identifier, res, r_free, comp, method = line.strip().split('\t')
try:
res = float(res)
except ValueError:
res = None
try:
r_free = float(r_free)
except ValueError:
r_free = None
try:
comp = float(comp)
except ValueError:
comp = None
annotations[identifier] = PDBDATA(identifier, res, r_free, comp, method)
return annotations
def read_cluster(cluster_file):
""" Reads in the clusters (this is the output of MMseqs). """
cluster = defaultdict(set)
with open(cluster_file) as fh:
for line in fh:
exemplar, node = line.split()
if node in cluster[exemplar]:
raise Exception('! Please check the clustering procedure: {n} was found twice in cluster {e}.'.format(
n = node,
e = exemplar))
else:
cluster[exemplar].add(node)
return cluster
def read_pdblist(in_file):
pdb_list = set()
with open(in_file) as fh:
for line in fh:
if line.startswith('#'):
continue
strip_line = line.strip()
pdb_code = strip_line.split('_')[0]
# this is a very very basic check
if len(pdb_code) != 4:
print ('! Warning: {line} seems to be an incorrect identifer. Skipping it.'.format(
line = strip_line))
continue
pdb = strip_line.upper()
pdb_list.add(pdb)
return pdb_list
def select_sequences(clusters, annotations):
selected_sequences = set()
for idx, cluster in enumerate(clusters):
nodes = [ annotations[entry] for entry in clusters[cluster] ]
if DEBUG:
print ('Processing Cluster {c} ({i}): {m}'.format(
c = cluster,
i = idx,
m = ', '.join([x.entry for x in nodes])))
# select the best entries of the cluster
best_res = float('inf')
best_entry_res = None
best_rfr = float('inf')
best_entry_rfr = None
best_comp = -float('inf')
best_entry_comp = None
# iterate through each entry in nodes while selecting the representative sequence
for node in nodes:
if (node.res is not None) and (node.res < best_res):
best_res = node.res
best_entry_res = node.entry
if (node.rfr is not None) and (node.rfr < best_rfr):
best_rfr = node.rfr
best_entry_rfr = node.entry
if (node.comp is not None) and (node.comp > best_comp):
best_comp = node.comp
best_entry_comp = node.entry
if best_entry_res is not None:
selected_sequences.add(best_entry_res)
if DEBUG:
print (' - Selected {n} (best resolution = {r}).'.format(
n = best_entry_res,
r = best_res))
if best_entry_rfr is not None:
selected_sequences.add(best_entry_rfr)
if DEBUG:
print (' - Selected {n} (best R-free = {r}).'.format(
n = best_entry_rfr,
r = best_rfr))
if best_entry_comp is not None:
selected_sequences.add(best_entry_comp)
if DEBUG:
print (' - Selected {n} (best completness = {r}).'.format(
n = best_entry_comp,
r = best_comp))
if best_entry_res == None and best_entry_rfr == None and best_entry_comp == None:
print ('! Warning: Did not find any representative entry for cluster {c}.'.format(
c = cluster))
return selected_sequences
def write_sequences(out_file, fasta_db, selected_sequences):
""" Writes selected sequences to a fasta file."""
with open(out_file, 'w') as fh:
for seq in selected_sequences:
fasta_entry = '{h}\n{seq}\n'.format(
h = fasta_db[seq].header,
seq = '\n'.join(textwrap.wrap(fasta_db[seq].seq, 80)))
fh.write(fasta_entry)
def arg():
import argparse
description = """
pdbfilter.py selects from sequence clusters (determined by MMSeqs) the sequences
which have the best resolution, R-free factor and/or completness and writes them to a fasta file.
""".replace('\n', '')
epilog = '2016 Harald Voehringer'
# Initiate a ArgumentParser Class
parser = argparse.ArgumentParser(description = description, epilog = epilog)
# Call add_options to the parser
parser.add_argument('fasta', help = 'input fasta file (created by cif2fasta.py)', metavar = 'FILE')
parser.add_argument('cluster', help = 'sequence clusters (MMseqs)', metavar = 'FILE')
parser.add_argument('annotations', help = 'annotations file (created by cif2fasta using the -p flag, contains information about resolution, R-free and completness of sequences).', metavar = 'FILE')
parser.add_argument('out_file', help = 'output fasta file', metavar = 'FILE')
parser.add_argument('-i', '--include', help = 'include PDB chains', metavar = 'FILE')
parser.add_argument('-r', '--remove', help = 'exclude PDB chains', metavar = 'FILE')
parser.add_argument('-v', '--verbose', action = 'store_true', help = 'verbose mode')
return parser
def main():
import sys
parser = arg()
args = parser.parse_args(sys.argv[1:])
global DEBUG
if args.verbose:
DEBUG = True
fasta = read_fasta(args.fasta)
clu70 = read_cluster(args.cluster)
annot = read_fasta_annotations(args.annotations)
print ("Found {i} clusters.".format(
i = len(clu70.keys())))
# choose representative sequences from clusters
selection = select_sequences(clu70, annot)
# make sure that pdbs specified in the argument are included
if args.include:
to_include = read_pdblist(args.include)
for pdb in to_include:
if pdb in fasta.keys():
if pdb not in selection:
if DEBUG:
print ('Adding {p}.'.format(
p = pdb))
selection.add(pdb)
else:
print ('! Warning: {p} was not found in input fasta.'.format(
p = pdb))
# removes entries
if args.remove:
to_remove = read_pdblist(args.remove)
for pdb in to_remove:
if pdb in selection:
if DEBUG:
print ('Removing {p}.'.format(
p = pdb))
selection.remove(pdb)
# write them to file
write_sequences(args.out_file, fasta, selection)
if __name__ == "__main__":
main()
|