1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
"""Output Biopython SeqRecords and SeqFeatures to GFF3 format.
The target format is GFF3, the current GFF standard:
http://www.sequenceontology.org/gff3.shtml
"""
from six.moves import urllib
from Bio import SeqIO
class _IdHandler:
"""Generate IDs for GFF3 Parent/Child relationships where they don't exist.
"""
def __init__(self):
self._prefix = "biopygen"
self._counter = 1
self._seen_ids = []
def _generate_id(self, quals):
"""Generate a unique ID not present in our existing IDs.
"""
gen_id = self._get_standard_id(quals)
if gen_id is None:
while 1:
gen_id = "%s%s" % (self._prefix, self._counter)
if gen_id not in self._seen_ids:
break
self._counter += 1
return gen_id
def _get_standard_id(self, quals):
"""Retrieve standardized IDs from other sources like NCBI GenBank.
This tries to find IDs from known key/values when stored differently
than GFF3 specifications.
"""
possible_keys = ["transcript_id", "protein_id"]
for test_key in possible_keys:
if test_key in quals:
cur_id = quals[test_key]
if isinstance(cur_id, tuple) or isinstance(cur_id, list):
return cur_id[0]
else:
return cur_id
return None
def update_quals(self, quals, has_children):
"""Update a set of qualifiers, adding an ID if necessary.
"""
cur_id = quals.get("ID", None)
# if we have an ID, record it
if cur_id:
if not isinstance(cur_id, list) and not isinstance(cur_id, tuple):
cur_id = [cur_id]
for add_id in cur_id:
self._seen_ids.append(add_id)
# if we need one and don't have it, create a new one
elif has_children:
new_id = self._generate_id(quals)
self._seen_ids.append(new_id)
quals["ID"] = [new_id]
return quals
class GFF3Writer:
"""Write GFF3 files starting with standard Biopython objects.
"""
def __init__(self):
pass
def write(self, recs, out_handle, include_fasta=False):
"""Write the provided records to the given handle in GFF3 format.
"""
id_handler = _IdHandler()
self._write_header(out_handle)
fasta_recs = []
try:
recs = iter(recs)
except TypeError:
recs = [recs]
for rec in recs:
self._write_rec(rec, out_handle)
self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle)
for sf in rec.features:
sf = self._clean_feature(sf)
id_handler = self._write_feature(sf, rec.id, out_handle,
id_handler)
if include_fasta and len(rec.seq) > 0:
fasta_recs.append(rec)
if len(fasta_recs) > 0:
self._write_fasta(fasta_recs, out_handle)
def _clean_feature(self, feature):
quals = {}
for key, val in feature.qualifiers.items():
if not isinstance(val, (list, tuple)):
val = [val]
val = [str(x) for x in val]
quals[key] = val
feature.qualifiers = quals
# Support for Biopython 1.68 and above, which removed sub_features
if not hasattr(feature, "sub_features"):
feature.sub_features = []
clean_sub = [self._clean_feature(f) for f in feature.sub_features]
feature.sub_features = clean_sub
return feature
def _write_rec(self, rec, out_handle):
# if we have a SeqRecord, write out optional directive
if len(rec.seq) > 0:
out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)))
def _get_phase(self, feature):
if "phase" in feature.qualifiers:
phase = feature.qualifiers["phase"][0]
elif feature.type == "CDS":
phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1
else:
phase = "."
return str(phase)
def _write_feature(self, feature, rec_id, out_handle, id_handler,
parent_id=None):
"""Write a feature with location information.
"""
if feature.location.strand == 1:
strand = '+'
elif feature.location.strand == -1:
strand = '-'
else:
strand = '.'
# remove any standard features from the qualifiers
quals = feature.qualifiers.copy()
for std_qual in ["source", "score", "phase"]:
if std_qual in quals and len(quals[std_qual]) == 1:
del quals[std_qual]
# add a link to a parent identifier if it exists
if parent_id:
if not "Parent" in quals:
quals["Parent"] = []
quals["Parent"].append(parent_id)
quals = id_handler.update_quals(quals, len(feature.sub_features) > 0)
if feature.type:
ftype = feature.type
else:
ftype = "sequence_feature"
parts = [str(rec_id),
feature.qualifiers.get("source", ["feature"])[0],
ftype,
str(feature.location.start + 1), # 1-based indexing
str(feature.location.end),
feature.qualifiers.get("score", ["."])[0],
strand,
self._get_phase(feature),
self._format_keyvals(quals)]
out_handle.write("\t".join(parts) + "\n")
for sub_feature in feature.sub_features:
id_handler = self._write_feature(sub_feature, rec_id, out_handle,
id_handler, quals["ID"][0])
return id_handler
def _format_keyvals(self, keyvals):
format_kvs = []
for key in sorted(keyvals.keys()):
values = keyvals[key]
key = key.strip()
format_vals = []
if not isinstance(values, list) or isinstance(values, tuple):
values = [values]
for val in values:
val = urllib.parse.quote(str(val).strip(), safe=":/ ")
if ((key and val) and val not in format_vals):
format_vals.append(val)
format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
return ";".join(format_kvs)
def _write_annotations(self, anns, rec_id, size, out_handle):
"""Add annotations which refer to an entire sequence.
"""
format_anns = self._format_keyvals(anns)
if format_anns:
parts = [rec_id, "annotation", "remark", "1", str(size if size > 1 else 1),
".", ".", ".", format_anns]
out_handle.write("\t".join(parts) + "\n")
def _write_header(self, out_handle):
"""Write out standard header directives.
"""
out_handle.write("##gff-version 3\n")
def _write_fasta(self, recs, out_handle):
"""Write sequence records using the ##FASTA directive.
"""
out_handle.write("##FASTA\n")
SeqIO.write(recs, out_handle, "fasta")
def write(recs, out_handle, include_fasta=False):
"""High level interface to write GFF3 files from SeqRecords and SeqFeatures.
If include_fasta is True, the GFF3 file will include sequence information
using the ##FASTA directive.
"""
writer = GFF3Writer()
return writer.write(recs, out_handle, include_fasta)
|