File: Template.py

package info (click to toggle)
python-biopython 1.68%2Bdfsg-3~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 46,856 kB
  • sloc: python: 160,306; xml: 93,216; ansic: 9,118; sql: 1,208; makefile: 155; sh: 63
file content (228 lines) | stat: -rw-r--r-- 7,528 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# Copyright 2007 by Tiago Antao.  All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.

"""Utility code for using SimCoal (DEPRECATED)."""

from __future__ import print_function

from os import sep
import re
from functools import reduce

from Bio.PopGen.SimCoal import builtin_tpl_dir


def exec_template(template):
    executed_template = template
    match = re.search('!!!(.*?)!!!', executed_template, re.MULTILINE)
    # while len(match.groups())>0:
    while match:
        exec_result = str(eval(match.groups()[0]))
        executed_template = executed_template.replace(
               '!!!' + match.groups()[0] + '!!!',
               exec_result, 1)
        match = re.search('!!!(.*?)!!!', executed_template, re.MULTILINE)
        # match = patt.matcher(String(executed_template))
    return executed_template


def process_para(in_string, out_file_prefix, para_list, curr_values):
    if (para_list == []):
        template = in_string
        f_name = out_file_prefix
        # f_name += '_' + str(total_size)
        for tup in curr_values:
            name, val = tup
            f_name += '_' + str(val)
            # reg = re.compile('\?' + name, re.MULTILINE)
            # template = re.sub(reg, str(val), template)
            template = template.replace('?' + name, str(val))
        with open(f_name + '.par', 'w') as f:
            # executed_template = template
            executed_template = exec_template(template)
            clean_template = executed_template.replace('\r\n', '\n').replace('\n\n', '\n')
            f.write(clean_template)
        return [f_name]
    else:
        name, rng = para_list[0]
        fnames = []
        for val in rng:
            new_values = [(name, val)]
            new_values.extend(curr_values)
            more_names = process_para(in_string, out_file_prefix, para_list[1:], new_values)
            fnames.extend(more_names)
        return fnames


def dupe(motif, times):
    ret_str = ''
    for i in range(1, times + 1):
        ret_str += motif + '\r\n'
    return ret_str


def get_xy_from_matrix(x_max, y_max, pos):
    y = (pos - 1) / x_max
    x = (pos - 1) % x_max
    return x, y


def get_step_2d(x_max, y_max, x, y, mig):
    my_x, my_y = get_xy_from_matrix(x_max, y_max, y)
    other_x, other_y = get_xy_from_matrix(x_max, y_max, x)

    if (my_x - other_x) ** 2 + (my_y - other_y) ** 2 == 1:
        return str(mig) + ' '
    else:
        return '0 '


def generate_ssm2d_mat(x_max, y_max, mig):
    mig_mat = ''
    for x in range(1, x_max * y_max + 1):
        for y in range(1, x_max * y_max + 1):
            mig_mat += get_step_2d(x_max, y_max, x, y, mig)
        mig_mat += "\r\n"
    return mig_mat


def generate_island_mat(total_size, mig):
    mig_mat = ''
    for x in range(1, total_size + 1):
        for y in range(1, total_size + 1):
            if (x == y):
                mig_mat += '0 '
            else:
                mig_mat += '!!!' + str(mig) + '!!! '
        mig_mat += "\r\n"
    return mig_mat


def generate_null_mat(total_size):
    null_mat = ''
    for x in range(1, total_size + 1):
        for y in range(1, total_size + 1):
            null_mat += '0 '
        null_mat += '\r\n'
    return null_mat


def generate_join_events(t, total_size, join_size, orig_size):
    events = ''
    for i in range(1, total_size - 1):
        events += str(t) + ' ' + str(i) + ' 0 1 1 0 1\r\n'
    events += str(t) + ' ' + str(total_size - 1) + ' 0 1 ' + str(1.0 * total_size * join_size / orig_size) + ' 0 1\r\n'
    return events


def no_processor(in_string):
    return in_string


def process_text(in_string, out_file_prefix, para_list, curr_values,
                 specific_processor):
    text = specific_processor(in_string)
    return process_para(text, out_file_prefix, para_list, [])


# def prepare_dir():
#    try:
#        mkdir(sep.join([Config.dataDir, 'SimCoal'])) # Should exist, but...
#    except OSError:
#        pass # Its ok if already exists
#    try:
#        mkdir(sep.join([Config.dataDir, 'SimCoal', 'runs']))
#    except OSError:
#        pass # Its ok if already exists


# sep is because of jython
def generate_model(par_stream, out_prefix, params,
                   specific_processor=no_processor, out_dir='.'):
    # prepare_dir()
    text = par_stream.read()
    out_file_prefix = sep.join([out_dir, out_prefix])
    return process_text(text, out_file_prefix, params, [], specific_processor)


def get_demography_template(stream, model, tp_dir=None):
    """Gets a demograpy template.

    Most probably this model needs to be sent to GenCases.

    - stream - Writable stream.
    - param  - Template file.
    - tp_dir - Directory where to find the template, if None
               use an internal template
    """
    if tp_dir is None:
        # Internal Template
        filename = sep.join([builtin_tpl_dir, model + '.par'])
    else:
        # External template
        filename = sep.join([tp_dir, model + '.par'])
    with open(filename, 'r') as f:
        l = f.readline()
        while l != '':
            stream.write(l)
            l = f.readline()


def _gen_loci(stream, loci):
    stream.write('//Number of contiguous linkage blocks in chromosome\n')
    stream.write(str(len(loci)) + '\n')
    stream.write('//Per Block: Data type, No. of loci, Recombination rate to the right-side locus, plus optional parameters\n')
    for locus in loci:
        stream.write(' '.join([locus[0]] +
            [str(x) for x in list(locus[1])]) + '\n')


def get_chr_template(stream, chrs):
    """Writes a Simcoal2 loci template part.

    stream - Writable stream.
    chr    - Chromosome list.

    Current loci list:

          - [(chr_repeats,[(marker, (params))])]

              - chr_repeats --> Number of chromosome repeats
              - marker  --> 'SNP', 'DNA', 'RFLP', 'MICROSAT'
              - params  --> Simcoal2 parameters for markers (list of floats
                or ints - if to be processed by generate_model)
    """
    num_chrs = reduce(lambda x, y: x + y[0], chrs, 0)
    stream.write('//Number of independent (unlinked) chromosomes, and "chromosome structure" flag:  0 for identical structure across chromosomes, and  1 for different structures on different chromosomes.\n')
    if len(chrs) > 1 or num_chrs == 1:
        stream.write(str(num_chrs) + ' 1\n')
    else:
        stream.write(str(num_chrs) + ' 0\n')
    for chr in chrs:
        repeats = chr[0]
        loci = chr[1]
        if len(chrs) == 1:
            _gen_loci(stream, loci)
        else:
            for i in range(repeats):
                _gen_loci(stream, loci)


def generate_simcoal_from_template(model, chrs, params, out_dir='.', tp_dir=None):
    """Writes a complete SimCoal2 template file.

    This joins together get_demography_template and get_chr_template,
    which are feed into generate_model
    Please check the three functions for parameters (model from
    get_demography_template, chrs from get_chr_template and
    params from generate_model).
    """
    with open(out_dir + sep + 'tmp.par', 'w') as stream:
        get_demography_template(stream, model, tp_dir)
        get_chr_template(stream, chrs)
    # with open(out_dir + sep + 'tmp.par', 'r') as par_stream:
        # print par_stream.read()
    with open(out_dir + sep + 'tmp.par', 'r') as par_stream:
        generate_model(par_stream, model, params, out_dir=out_dir)