File: LargeFileParser.py

package info (click to toggle)
python-biopython 1.68%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 46,860 kB
  • ctags: 13,237
  • sloc: python: 160,306; xml: 93,216; ansic: 9,118; sql: 1,208; makefile: 155; sh: 63
file content (115 lines) | stat: -rw-r--r-- 3,828 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright 2010 by Tiago Antao.  All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.

"""
Large file parsing of Genepop files

The standard parser loads the whole file into memory. This parser
provides an iterator over data.

Classes:
LargeRecord           Holds GenePop data.

Functions:
read             Parses a GenePop record (file) into a Record object.

"""


def get_indiv(line):
    indiv_name, marker_line = line.split(',')
    markers = marker_line.replace('\t', ' ').split(' ')
    markers = [marker for marker in markers if marker != '']
    if len(markers[0]) in [2, 4]:  # 2 digits per allele
        marker_len = 2
    else:
        marker_len = 3
    try:
        allele_list = [(int(marker[0:marker_len]),
                       int(marker[marker_len:]))
                   for marker in markers]
    except ValueError:  # Haploid
        allele_list = [(int(marker[0:marker_len]),)
                   for marker in markers]
    return indiv_name, allele_list, marker_len


def read(handle):
    """Parses a handle containing a GenePop file.

       handle is a file-like object that contains a GenePop record.
    """
    record = Record(handle)
    record.comment_line = str(handle.readline()).rstrip()
    # We can now have one loci per line or all loci in a single line
    # separated by either space or comma+space...
    # We will remove all commas on loci... that should not be a problem
    sample_loci_line = str(handle.readline()).rstrip().replace(',', '')
    all_loci = sample_loci_line.split(' ')
    record.loci_list.extend(all_loci)
    line = handle.readline()
    while line != "":
        line = line.rstrip()
        if line.upper() == "POP":
            record.stack.append("POP")
            break
        record.loci_list.append(line)
        line = handle.readline()
    next_line = handle.readline().rstrip()
    indiv_name, allele_list, record.marker_len = get_indiv(next_line)
    record.stack.append(next_line)
    return record


class Record(object):
    """Holds information from a GenePop record.

    Members:
    marker_len         The marker length (2 or 3 digit code per allele).

    comment_line       Comment line.

    loci_list          List of loci names.

    data_generator     Iterates over population data.

    The generator will only work once. If you want to read a handle
    twice you have to re-open it!

    data_generator can either be () - an empty tuple - marking a new
    population or an individual. An individual is something like
    ('Ind1', [(1,1), (3,None), (200,201)],
    In the case above the individual is called Ind1,
    has three diploid loci. For the second loci, one of the alleles
    is unknown.

    """
    def __init__(self, handle):
        self.handle = handle
        self.marker_len = 0
        self.comment_line = ""
        self.loci_list = []
        self.populations = []
        self.stack = []

    def data_generator(self):
        for handle in [self.stack, self.handle]:
            for line in handle:
                line = line.rstrip()
                if line.upper() == 'POP':
                    yield ()
                else:
                    indiv_name, allele_list, marker_len = get_indiv(line)
                    clean_list = []
                    for locus in allele_list:
                        mk_real = []
                        for al in locus:
                            if al == 0:
                                mk_real.append(None)
                            else:
                                mk_real.append(al)
                        clean_list.append(tuple(mk_real))
                    yield indiv_name, clean_list
        raise StopIteration()