1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
|
"""Parse GFF files into features attached to Biopython SeqRecord objects.
This deals with GFF3 formatted files, a tab delimited format for storing
sequence features and annotations:
http://www.sequenceontology.org/gff3.shtml
It will also deal with older GFF versions (GTF/GFF2):
http://www.sanger.ac.uk/Software/formats/GFF/GFF_Spec.shtml
http://mblab.wustl.edu/GTF22.html
The implementation utilizes map/reduce parsing of GFF using Disco. Disco
(http://discoproject.org) is a Map-Reduce framework for Python utilizing
Erlang for parallelization. The code works on a single processor without
Disco using the same architecture.
"""
import os
import copy
import re
import collections
import io
import itertools
import six
from six.moves import urllib
# Make defaultdict compatible with versions of python older than 2.4
try:
collections.defaultdict
except AttributeError:
import _utils
collections.defaultdict = _utils.defaultdict
unknown_seq_avail = False
try:
from Bio.Seq import UnknownSeq
unknown_seq_avail = True
except ImportError:
# Starting with biopython 1.81, has been removed
from Bio.Seq import _UndefinedSequenceData
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqFeature
from Bio import SeqIO
def _gff_line_map(line, params):
"""Map part of Map-Reduce; parses a line of GFF into a dictionary.
Given an input line from a GFF file, this:
- decides if the file passes our filtering limits
- if so:
- breaks it into component elements
- determines the type of attribute (flat, parent, child or annotation)
- generates a dictionary of GFF info which can be serialized as JSON
"""
def _merge_keyvals(parts):
"""Merge key-values escaped by quotes that are improperly split at semicolons.
"""
out = []
for i, p in enumerate(parts):
if i > 0 and len(p) == 1 and p[0].endswith('"') and not p[0].startswith('"'):
if out[-1][-1].startswith('"'):
prev_p = out.pop(-1)
to_merge = prev_p[-1]
prev_p[-1] = "%s; %s" % (to_merge, p[0])
out.append(prev_p)
else:
out.append(p)
return out
gff3_kw_pat = re.compile(r"\w+=")
def _split_keyvals(keyval_str):
"""Split key-value pairs in a GFF2, GTF and GFF3 compatible way.
GFF3 has key value pairs like:
count=9;gene=amx-2;sequence=SAGE:aacggagccg
GFF2 and GTF have:
Sequence "Y74C9A" ; Note "Clone Y74C9A; Genbank AC024206"
name "fgenesh1_pg.C_chr_1000003"; transcriptId 869
"""
quals = collections.defaultdict(list)
if keyval_str is None:
return quals
# ensembl GTF has a stray semi-colon at the end
if keyval_str[-1] == ';':
keyval_str = keyval_str[:-1]
# GFF2/GTF has a semi-colon with at least one space after it.
# It can have spaces on both sides; wormbase does this.
# GFF3 works with no spaces.
# Split at the first one we can recognize as working
parts = keyval_str.split(" ; ")
if len(parts) == 1:
parts = [x.strip() for x in keyval_str.split(";")]
# check if we have GFF3 style key-vals (with =)
is_gff2 = True
if gff3_kw_pat.match(parts[0]):
is_gff2 = False
key_vals = _merge_keyvals([p.split('=') for p in parts])
# otherwise, we are separated by a space with a key as the first item
else:
pieces = []
for p in parts:
# fix misplaced semi-colons in keys in some GFF2 files
if p and p[0] == ';':
p = p[1:]
pieces.append(p.strip().split(" "))
key_vals = [(p[0], " ".join(p[1:])) for p in pieces]
for item in key_vals:
# standard in-spec items are key=value
if len(item) == 2:
key, val = item
# out-of-spec files can have just key values. We set an empty value
# which will be changed to true later to standardize.
else:
assert len(item) == 1, item
key = item[0]
val = ''
# remove quotes in GFF2 files
quoted = False
if (len(val) > 0 and val[0] == '"' and val[-1] == '"'):
quoted = True
val = val[1:-1]
if val:
if quoted:
quals[key].append(val)
else:
quals[key].extend([v for v in val.split(',') if v])
# if we don't have a value, make this a key=True/False style
# attribute
else:
quals[key].append('true')
for key, vals in quals.items():
quals[key] = [urllib.parse.unquote(v) for v in vals]
return quals, is_gff2
def _nest_gff2_features(gff_parts):
"""Provide nesting of GFF2 transcript parts with transcript IDs.
exons and coding sequences are mapped to a parent with a transcript_id
in GFF2. This is implemented differently at different genome centers
and this function attempts to resolve that and map things to the GFF3
way of doing them.
"""
# map protein or transcript ids to a parent
for transcript_id in ["transcript_id", "transcriptId", "proteinId"]:
try:
gff_parts["quals"]["Parent"] = \
gff_parts["quals"][transcript_id]
break
except KeyError:
pass
# case for WormBase GFF -- everything labelled as Transcript or CDS
for flat_name in ["Transcript", "CDS"]:
if flat_name in gff_parts["quals"]:
# parent types
if gff_parts["type"] in [flat_name]:
if not gff_parts["id"]:
gff_parts["id"] = gff_parts["quals"][flat_name][0]
gff_parts["quals"]["ID"] = [gff_parts["id"]]
# children types
elif gff_parts["type"] in ["intron", "exon", "three_prime_UTR",
"coding_exon", "five_prime_UTR", "CDS", "stop_codon",
"start_codon"]:
gff_parts["quals"]["Parent"] = gff_parts["quals"][flat_name]
break
return gff_parts
strand_map = {'+' : 1, '-' : -1, '?' : None, None: None}
line = line.strip()
if line[:2] == "##":
return [('directive', line[2:])]
elif line and line[0] != "#":
parts = line.split('\t')
should_do = True
if params.limit_info:
for limit_name, limit_values in params.limit_info.items():
cur_id = tuple([parts[i] for i in
params.filter_info[limit_name]])
if cur_id not in limit_values:
should_do = False
break
if should_do:
assert len(parts) >= 8, line
# not python2.4 compatible but easier to understand
#gff_parts = [(None if p == '.' else p) for p in parts]
gff_parts = []
for p in parts:
if p == ".":
gff_parts.append(None)
else:
gff_parts.append(p)
gff_info = dict()
# collect all of the base qualifiers for this item
if len(parts) > 8:
quals, is_gff2 = _split_keyvals(gff_parts[8])
else:
quals, is_gff2 = collections.defaultdict(list), False
gff_info["is_gff2"] = is_gff2
if gff_parts[1]:
quals["source"].append(gff_parts[1])
if gff_parts[5]:
quals["score"].append(gff_parts[5])
if gff_parts[7]:
quals["phase"].append(gff_parts[7])
gff_info['quals'] = dict(quals)
gff_info['rec_id'] = gff_parts[0]
# if we are describing a location, then we are a feature
if gff_parts[3] and gff_parts[4]:
gff_info['location'] = [int(gff_parts[3]) - 1,
int(gff_parts[4])]
gff_info['type'] = gff_parts[2]
gff_info['id'] = quals.get('ID', [''])[0]
gff_info['strand'] = strand_map.get(gff_parts[6], None)
if is_gff2:
gff_info = _nest_gff2_features(gff_info)
# features that have parents need to link so we can pick up
# the relationship
if "Parent" in gff_info['quals']:
# check for self referential parent/child relationships
# remove the ID, which is not useful
for p in gff_info['quals']['Parent']:
if p == gff_info['id']:
gff_info['id'] = ''
del gff_info['quals']['ID']
break
final_key = 'child'
elif gff_info['id']:
final_key = 'parent'
# Handle flat features
else:
final_key = 'feature'
# otherwise, associate these annotations with the full record
else:
final_key = 'annotation'
if params.jsonify:
return [(final_key, simplejson.dumps(gff_info))]
else:
return [(final_key, gff_info)]
return []
def _gff_line_reduce(map_results, out, params):
"""Reduce part of Map-Reduce; combines results of parsed features.
"""
final_items = dict()
for gff_type, final_val in map_results:
if params.jsonify and gff_type not in ['directive']:
final_val = simplejson.loads(final_val)
try:
final_items[gff_type].append(final_val)
except KeyError:
final_items[gff_type] = [final_val]
for key, vals in final_items.items():
if params.jsonify:
vals = simplejson.dumps(vals)
out.add(key, vals)
class _MultiIDRemapper:
"""Provide an ID remapping for cases where a parent has a non-unique ID.
Real life GFF3 cases have non-unique ID attributes, which we fix here
by using the unique sequence region to assign children to the right
parent.
"""
def __init__(self, base_id, all_parents):
self._base_id = base_id
self._parents = all_parents
def remap_id(self, feature_dict):
rstart, rend = feature_dict['location']
for index, parent in enumerate(self._parents):
pstart, pend = parent['location']
if rstart >= pstart and rend <= pend:
if index > 0:
return ("%s_%s" % (self._base_id, index + 1))
else:
return self._base_id
# if we haven't found a location match but parents are umabiguous, return that
if len(self._parents) == 1:
return self._base_id
raise ValueError("Did not find remapped ID location: %s, %s, %s" % (
self._base_id, [p['location'] for p in self._parents],
feature_dict['location']))
class _AbstractMapReduceGFF:
"""Base class providing general GFF parsing for local and remote classes.
This class should be subclassed to provide a concrete class to parse
GFF under specific conditions. These classes need to implement
the _gff_process function, which returns a dictionary of SeqRecord
information.
"""
def __init__(self, create_missing=True):
"""Initialize GFF parser
create_missing - If True, create blank records for GFF ids not in
the base_dict. If False, an error will be raised.
"""
self._create_missing = create_missing
self._map_fn = _gff_line_map
self._reduce_fn = _gff_line_reduce
self._examiner = GFFExaminer()
def _gff_process(self, gff_files, limit_info, target_lines=None):
raise NotImplementedError("Derived class must define")
def parse(self, gff_files, base_dict=None, limit_info=None):
"""Parse a GFF file, returning an iterator of SeqRecords.
limit_info - A dictionary specifying the regions of the GFF file
which should be extracted. This allows only relevant portions of a file
to be parsed.
base_dict - A base dictionary of SeqRecord objects which may be
pre-populated with sequences and other features. The new features from
the GFF file will be added to this dictionary.
"""
for rec in self.parse_in_parts(gff_files, base_dict, limit_info):
yield rec
def parse_in_parts(self, gff_files, base_dict=None, limit_info=None,
target_lines=None):
"""Parse a region of a GFF file specified, returning info as generated.
target_lines -- The number of lines in the file which should be used
for each partial parse. This should be determined based on available
memory.
"""
for results in self.parse_simple(gff_files, limit_info, target_lines):
if base_dict is None:
cur_dict = dict()
else:
cur_dict = copy.deepcopy(base_dict)
cur_dict = self._results_to_features(cur_dict, results)
all_ids = list(cur_dict.keys())
all_ids.sort()
for cur_id in all_ids:
yield cur_dict[cur_id]
def parse_simple(self, gff_files, limit_info=None, target_lines=1):
"""Simple parse which does not build or nest features.
This returns a simple dictionary representation of each line in the
GFF file.
"""
# gracefully handle a single file passed
if not isinstance(gff_files, (list, tuple)):
gff_files = [gff_files]
limit_info = self._normalize_limit_info(limit_info)
for results in self._gff_process(gff_files, limit_info, target_lines):
yield results
def _normalize_limit_info(self, limit_info):
"""Turn all limit information into tuples for identical comparisons.
"""
final_limit_info = {}
if limit_info:
for key, values in limit_info.items():
final_limit_info[key] = []
for v in values:
if isinstance(v, str):
final_limit_info[key].append((v,))
else:
final_limit_info[key].append(tuple(v))
return final_limit_info
def _results_to_features(self, base, results):
"""Add parsed dictionaries of results to Biopython SeqFeatures.
"""
base = self._add_annotations(base, results.get('annotation', []))
for feature in results.get('feature', []):
(_, base) = self._add_toplevel_feature(base, feature)
base = self._add_parent_child_features(base, results.get('parent', []),
results.get('child', []))
base = self._add_seqs(base, results.get('fasta', []))
base = self._add_directives(base, results.get('directive', []))
return base
def _add_directives(self, base, directives):
"""Handle any directives or meta-data in the GFF file.
Relevant items are added as annotation meta-data to each record.
"""
dir_keyvals = collections.defaultdict(list)
for directive in directives:
parts = directive.split()
if len(parts) > 1:
key = parts[0]
if len(parts) == 2:
val = parts[1]
else:
val = tuple(parts[1:])
# specific directives that need special handling
if key == "sequence-region": # convert to Python 0-based coordinates
if len(val) == 2: # handle regions missing contig
val = (int(val[0]) - 1, int(val[1]))
elif len(val) == 3:
val = (val[0], int(val[1]) - 1, int(val[2]))
dir_keyvals[key].append(val)
for key, vals in dir_keyvals.items():
for rec in base.values():
self._add_ann_to_rec(rec, key, vals)
return base
def _get_matching_record_id(self, base, find_id):
"""Find a matching base record with the test identifier, handling tricky cases.
NCBI IDs https://en.wikipedia.org/wiki/FASTA_format#NCBI_identifiers
"""
# Straight matches for identifiers
if find_id in base:
return find_id
# NCBI style IDs in find_id
elif find_id and find_id.find("|") > 0:
for test_id in [x.strip() for x in find_id.split("|")[1:]]:
if test_id and test_id in base:
return test_id
# NCBI style IDs in base IDs
else:
for base_id in base.keys():
if base_id.find("|") > 0:
for test_id in [x.strip() for x in base_id.split("|")[1:]]:
if test_id and test_id == find_id:
return base_id
return None
def _add_seqs(self, base, recs):
"""Add sequence information contained in the GFF3 to records.
"""
for rec in recs:
match_id = self._get_matching_record_id(base, rec.id)
if match_id:
base[match_id].seq = rec.seq
else:
base[rec.id] = rec
return base
def _add_parent_child_features(self, base, parents, children):
"""Add nested features with parent child relationships.
"""
multi_remap = self._identify_dup_ids(parents)
# add children features
children_prep = collections.defaultdict(list)
for child_dict in children:
child_feature = self._get_feature(child_dict)
for pindex, pid in enumerate(child_feature.qualifiers['Parent']):
if pid in multi_remap:
pid = multi_remap[pid].remap_id(child_dict)
child_feature.qualifiers['Parent'][pindex] = pid
children_prep[pid].append((child_dict['rec_id'],
child_feature))
children = dict(children_prep)
# add children to parents that exist
for cur_parent_dict in parents:
cur_id = cur_parent_dict['id']
if cur_id in multi_remap:
cur_parent_dict['id'] = multi_remap[cur_id].remap_id(
cur_parent_dict)
cur_parent, base = self._add_toplevel_feature(base, cur_parent_dict)
cur_parent, children = self._add_children_to_parent(cur_parent,
children)
# create parents for children without them (GFF2 or split/bad files)
while len(children) > 0:
parent_id, cur_children = next(itertools.islice(children.items(), 1))
# one child, do not nest it
if len(cur_children) == 1:
rec_id, child = cur_children[0]
loc = (child.location.start, child.location.end)
rec, base = self._get_rec(base,
dict(rec_id=rec_id, location=loc))
rec.features.append(child)
del children[parent_id]
else:
cur_parent, base = self._add_missing_parent(base, parent_id,
cur_children)
cur_parent, children = self._add_children_to_parent(cur_parent,
children)
return base
def _identify_dup_ids(self, parents):
"""Identify duplicated ID attributes in potential nested parents.
According to the GFF3 spec ID attributes are supposed to be unique
for a file, but this is not always true in practice. This looks
for duplicates, and provides unique IDs sorted by locations.
"""
multi_ids = collections.defaultdict(list)
for parent in parents:
multi_ids[parent['id']].append(parent)
multi_ids = [(mid, ps) for (mid, ps) in multi_ids.items()
if len(parents) > 1]
multi_remap = dict()
for mid, parents in multi_ids:
multi_remap[mid] = _MultiIDRemapper(mid, parents)
return multi_remap
def _add_children_to_parent(self, cur_parent, children):
"""Recursively add children to parent features.
"""
if cur_parent.id in children:
cur_children = children[cur_parent.id]
ready_children = []
for _, cur_child in cur_children:
cur_child, _ = self._add_children_to_parent(cur_child, children)
ready_children.append(cur_child)
# Support Biopython features for 1.62+ CompoundLocations and pre-1.62
if not hasattr(SeqFeature, "CompoundLocation"):
cur_parent.location_operator = "join"
for cur_child in ready_children:
cur_parent.sub_features.append(cur_child)
del children[cur_parent.id]
return cur_parent, children
def _add_annotations(self, base, anns):
"""Add annotation data from the GFF file to records.
"""
# add these as a list of annotations, checking not to overwrite
# current values
for ann in anns:
rec, base = self._get_rec(base, ann)
for key, vals in ann['quals'].items():
self._add_ann_to_rec(rec, key, vals)
return base
def _add_ann_to_rec(self, rec, key, vals):
"""Add a key/value annotation to the given SeqRecord.
"""
if key in rec.annotations:
try:
rec.annotations[key].extend(vals)
except AttributeError:
rec.annotations[key] = [rec.annotations[key]] + vals
else:
rec.annotations[key] = vals
def _get_rec(self, base, info_dict):
"""Retrieve a record to add features to.
"""
max_loc = info_dict.get('location', (0, 1))[1]
match_id = self._get_matching_record_id(base, info_dict['rec_id'])
if match_id:
cur_rec = base[match_id]
# update generated unknown sequences with the expected maximum length
if unknown_seq_avail and isinstance(cur_rec.seq, UnknownSeq):
cur_rec.seq._length = max([max_loc, cur_rec.seq._length])
elif not unknown_seq_avail and isinstance(cur_rec.seq._data, _UndefinedSequenceData):
cur_rec.seq._data._length = max([max_loc, cur_rec.seq._data._length])
return cur_rec, base
elif self._create_missing:
if unknown_seq_avail:
new_rec = SeqRecord(UnknownSeq(max_loc), info_dict['rec_id'])
else:
new_rec = SeqRecord(Seq(None, length=max_loc), info_dict['rec_id'])
base[info_dict['rec_id']] = new_rec
return new_rec, base
else:
raise KeyError("Did not find matching record in %s for %s" %
(base.keys(), info_dict))
def _add_missing_parent(self, base, parent_id, cur_children):
"""Add a new feature that is missing from the GFF file.
"""
base_rec_id = list(set(c[0] for c in cur_children))
child_strands = list(set(c[1].location.strand for c in cur_children))
inferred_strand = child_strands[0] if len(child_strands) == 1 else None
assert len(base_rec_id) > 0
feature_dict = dict(id=parent_id, strand=inferred_strand,
type="inferred_parent", quals=dict(ID=[parent_id]),
rec_id=base_rec_id[0])
coords = [(c.location.start, c.location.end)
for r, c in cur_children]
feature_dict["location"] = (min([c[0] for c in coords]),
max([c[1] for c in coords]))
return self._add_toplevel_feature(base, feature_dict)
def _add_toplevel_feature(self, base, feature_dict):
"""Add a toplevel non-nested feature to the appropriate record.
"""
new_feature = self._get_feature(feature_dict)
rec, base = self._get_rec(base, feature_dict)
rec.features.append(new_feature)
return new_feature, base
def _get_feature(self, feature_dict):
"""Retrieve a Biopython feature from our dictionary representation.
"""
#location = SeqFeature.FeatureLocation(*feature_dict['location'])
rstart, rend = feature_dict['location']
new_feature = SeqFeature.SeqFeature(SeqFeature.SimpleLocation(start=rstart, end=rend, strand=feature_dict['strand']), feature_dict['type'],
id=feature_dict['id'])
# Support for Biopython 1.68 and above, which removed sub_features
if not hasattr(new_feature, "sub_features"):
new_feature.sub_features = []
new_feature.qualifiers = feature_dict['quals']
return new_feature
def _parse_fasta(self, in_handle):
"""Parse FASTA sequence information contained in the GFF3 file.
"""
return list(SeqIO.parse(in_handle, "fasta"))
class _GFFParserLocalOut:
"""Provide a collector for local GFF MapReduce file parsing.
"""
def __init__(self, smart_breaks=False):
self._items = dict()
self._smart_breaks = smart_breaks
self._missing_keys = collections.defaultdict(int)
self._last_parent = None
self.can_break = True
self.num_lines = 0
def add(self, key, vals):
if self._smart_breaks:
# if we are not GFF2 we expect parents and break
# based on not having missing ones
if key == 'directive':
if vals[0] == '#':
self.can_break = True
self._last_parent = None
elif not vals[0].get("is_gff2", False):
self._update_missing_parents(key, vals)
self.can_break = (len(self._missing_keys) == 0)
# break when we are done with stretches of child features
elif key != 'child':
self.can_break = True
self._last_parent = None
# break when we have lots of child features in a row
# and change between parents
else:
cur_parent = vals[0]["quals"]["Parent"][0]
if (self._last_parent):
self.can_break = (cur_parent != self._last_parent)
self._last_parent = cur_parent
self.num_lines += 1
try:
self._items[key].extend(vals)
except KeyError:
self._items[key] = vals
def _update_missing_parents(self, key, vals):
# smart way of deciding if we can break this.
# if this is too much, can go back to not breaking in the
# middle of children
if key in ["child"]:
for val in vals:
for p_id in val["quals"]["Parent"]:
self._missing_keys[p_id] += 1
for val in vals:
try:
del self._missing_keys[val["quals"]["ID"][0]]
except KeyError:
pass
def has_items(self):
return len(self._items) > 0
def get_results(self):
self._last_parent = None
return self._items
class GFFParser(_AbstractMapReduceGFF):
"""Local GFF parser providing standardized parsing of GFF3 and GFF2 files.
"""
def __init__(self, line_adjust_fn=None, create_missing=True):
_AbstractMapReduceGFF.__init__(self, create_missing=create_missing)
self._line_adjust_fn = line_adjust_fn
def _gff_process(self, gff_files, limit_info, target_lines):
"""Process GFF addition without any parallelization.
In addition to limit filtering, this accepts a target_lines attribute
which provides a number of lines to parse before returning results.
This allows partial parsing of a file to prevent memory issues.
"""
line_gen = self._file_line_generator(gff_files)
for out in self._lines_to_out_info(line_gen, limit_info, target_lines):
yield out
def _file_line_generator(self, gff_files):
"""Generate single lines from a set of GFF files.
"""
for gff_file in gff_files:
if hasattr(gff_file, "read"):
need_close = False
in_handle = gff_file
else:
need_close = True
in_handle = open(gff_file)
while 1:
line = in_handle.readline()
if not line:
break
yield line
if need_close:
in_handle.close()
def _lines_to_out_info(self, line_iter, limit_info=None,
target_lines=None):
"""Generate SeqRecord and SeqFeatures from GFF file lines.
"""
params = self._examiner._get_local_params(limit_info)
out_info = _GFFParserLocalOut((target_lines is not None and
target_lines > 1))
found_seqs = False
for line in line_iter:
results = self._map_fn(line, params)
if self._line_adjust_fn and results:
if results[0][0] not in ['directive']:
results = [(results[0][0],
self._line_adjust_fn(results[0][1]))]
self._reduce_fn(results, out_info, params)
if (target_lines and out_info.num_lines >= target_lines and
out_info.can_break):
yield out_info.get_results()
out_info = _GFFParserLocalOut((target_lines is not None and
target_lines > 1))
if (results and results[0][0] == 'directive' and
results[0][1] == 'FASTA'):
found_seqs = True
break
class FakeHandle:
def __init__(self, line_iter):
self._iter = line_iter
def __iter__(self):
return self
def __next__(self):
return next(self._iter)
next = __next__
def read(self, size=-1):
if size < 0:
return "".join(l for l in self._iter)
elif size == 0:
return "" # Used by Biopython to sniff unicode vs bytes
else:
raise NotImplementedError
def readline(self):
try:
return next(self._iter)
except StopIteration:
return ""
if found_seqs:
fasta_recs = self._parse_fasta(FakeHandle(line_iter))
out_info.add('fasta', fasta_recs)
if out_info.has_items():
yield out_info.get_results()
class DiscoGFFParser(_AbstractMapReduceGFF):
"""GFF Parser with parallelization through Disco (http://discoproject.org.
"""
def __init__(self, disco_host, create_missing=True):
"""Initialize parser.
disco_host - Web reference to a Disco host which will be used for
parallelizing the GFF reading job.
"""
_AbstractMapReduceGFF.__init__(self, create_missing=create_missing)
self._disco_host = disco_host
def _gff_process(self, gff_files, limit_info, target_lines=None):
"""Process GFF addition, using Disco to parallelize the process.
"""
assert target_lines is None, "Cannot split parallelized jobs"
# make these imports local; only need them when using disco
import simplejson
import disco
# absolute path names unless they are special disco files
full_files = []
for f in gff_files:
if f.split(":")[0] != "disco":
full_files.append(os.path.abspath(f))
else:
full_files.append(f)
results = disco.job(self._disco_host, name="gff_reader",
input=full_files,
params=disco.Params(limit_info=limit_info, jsonify=True,
filter_info=self._examiner._filter_info),
required_modules=["simplejson", "collections", "re"],
map=self._map_fn, reduce=self._reduce_fn)
processed = dict()
for out_key, out_val in disco.result_iterator(results):
processed[out_key] = simplejson.loads(out_val)
yield processed
def parse(gff_files, base_dict=None, limit_info=None, target_lines=None):
"""High level interface to parse GFF files into SeqRecords and SeqFeatures.
"""
parser = GFFParser()
for rec in parser.parse_in_parts(gff_files, base_dict, limit_info,
target_lines):
yield rec
def parse_simple(gff_files, limit_info=None):
"""Parse GFF files as line by line dictionary of parts.
"""
parser = GFFParser()
for rec in parser.parse_simple(gff_files, limit_info=limit_info):
if "child" in rec:
assert "parent" not in rec
yield rec["child"][0]
elif "parent" in rec:
yield rec["parent"][0]
elif "feature" in rec:
yield rec["feature"][0]
# ignore directive lines
else:
assert "directive" in rec
def _file_or_handle(fn):
"""Decorator to handle either an input handle or a file.
"""
def _file_or_handle_inside(*args, **kwargs):
in_file = args[1]
if hasattr(in_file, "read"):
need_close = False
in_handle = in_file
if six.PY3 and not isinstance(in_handle, io.TextIOBase):
raise TypeError('input handle must be opened in text mode')
else:
need_close = True
in_handle = open(in_file)
args = (args[0], in_handle) + args[2:]
out = fn(*args, **kwargs)
if need_close:
in_handle.close()
return out
return _file_or_handle_inside
class GFFExaminer:
"""Provide high level details about a GFF file to refine parsing.
GFF is a spec and is provided by many different centers. Real life files
will present the same information in slightly different ways. Becoming
familiar with the file you are dealing with is the best way to extract the
information you need. This class provides high level summary details to
help in learning.
"""
def __init__(self):
self._filter_info = dict(gff_id = [0], gff_source_type = [1, 2],
gff_source = [1], gff_type = [2])
def _get_local_params(self, limit_info=None):
class _LocalParams:
def __init__(self):
self.jsonify = False
params = _LocalParams()
params.limit_info = limit_info
params.filter_info = self._filter_info
return params
@_file_or_handle
def available_limits(self, gff_handle):
"""Return dictionary information on possible limits for this file.
This returns a nested dictionary with the following structure:
keys -- names of items to filter by
values -- dictionary with:
keys -- filter choice
value -- counts of that filter in this file
Not a parallelized map-reduce implementation.
"""
cur_limits = dict()
for filter_key in self._filter_info.keys():
cur_limits[filter_key] = collections.defaultdict(int)
for line in gff_handle:
# when we hit FASTA sequences, we are done with annotations
if line.startswith("##FASTA"):
break
# ignore empty and comment lines
if line.strip() and line.strip()[0] != "#":
parts = [p.strip() for p in line.split('\t')]
assert len(parts) >= 8, line
parts = parts[:9]
for filter_key, cur_indexes in self._filter_info.items():
cur_id = tuple([parts[i] for i in cur_indexes])
cur_limits[filter_key][cur_id] += 1
# get rid of the default dicts
final_dict = dict()
for key, value_dict in cur_limits.items():
if len(key) == 1:
key = key[0]
final_dict[key] = dict(value_dict)
gff_handle.close()
return final_dict
@_file_or_handle
def parent_child_map(self, gff_handle):
"""Provide a mapping of parent to child relationships in the file.
Returns a dictionary of parent child relationships:
keys -- tuple of (source, type) for each parent
values -- tuple of (source, type) as children of that parent
Not a parallelized map-reduce implementation.
"""
# collect all of the parent and child types mapped to IDs
parent_sts = dict()
child_sts = collections.defaultdict(list)
for line in gff_handle:
# when we hit FASTA sequences, we are done with annotations
if line.startswith("##FASTA"):
break
if line.strip() and not line.startswith("#"):
line_type, line_info = _gff_line_map(line,
self._get_local_params())[0]
if (line_type == 'parent' or (line_type == 'child' and
line_info['id'])):
parent_sts[line_info['id']] = (
line_info['quals'].get('source', [""])[0], line_info['type'])
if line_type == 'child':
for parent_id in line_info['quals']['Parent']:
child_sts[parent_id].append((
line_info['quals'].get('source', [""])[0], line_info['type']))
#print parent_sts, child_sts
# generate a dictionary of the unique final type relationships
pc_map = collections.defaultdict(list)
for parent_id, parent_type in parent_sts.items():
for child_type in child_sts[parent_id]:
pc_map[parent_type].append(child_type)
pc_final_map = dict()
for ptype, ctypes in pc_map.items():
unique_ctypes = list(set(ctypes))
unique_ctypes.sort()
pc_final_map[ptype] = unique_ctypes
return pc_final_map
|