1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
|
# Copyright 2007 by Tiago Antao. All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license. Please see the LICENSE file that should have been included
# as part of this package.
"""Utility code for using SimCoal (DEPRECATED)."""
from __future__ import print_function
from os import sep
import re
from functools import reduce
from Bio.PopGen.SimCoal import builtin_tpl_dir
def exec_template(template):
executed_template = template
match = re.search('!!!(.*?)!!!', executed_template, re.MULTILINE)
# while len(match.groups())>0:
while match:
exec_result = str(eval(match.groups()[0]))
executed_template = executed_template.replace(
'!!!' + match.groups()[0] + '!!!',
exec_result, 1)
match = re.search('!!!(.*?)!!!', executed_template, re.MULTILINE)
# match = patt.matcher(String(executed_template))
return executed_template
def process_para(in_string, out_file_prefix, para_list, curr_values):
if (para_list == []):
template = in_string
f_name = out_file_prefix
# f_name += '_' + str(total_size)
for tup in curr_values:
name, val = tup
f_name += '_' + str(val)
# reg = re.compile('\?' + name, re.MULTILINE)
# template = re.sub(reg, str(val), template)
template = template.replace('?' + name, str(val))
with open(f_name + '.par', 'w') as f:
# executed_template = template
executed_template = exec_template(template)
clean_template = executed_template.replace('\r\n', '\n').replace('\n\n', '\n')
f.write(clean_template)
return [f_name]
else:
name, rng = para_list[0]
fnames = []
for val in rng:
new_values = [(name, val)]
new_values.extend(curr_values)
more_names = process_para(in_string, out_file_prefix, para_list[1:], new_values)
fnames.extend(more_names)
return fnames
def dupe(motif, times):
ret_str = ''
for i in range(1, times + 1):
ret_str += motif + '\r\n'
return ret_str
def get_xy_from_matrix(x_max, y_max, pos):
y = (pos - 1) / x_max
x = (pos - 1) % x_max
return x, y
def get_step_2d(x_max, y_max, x, y, mig):
my_x, my_y = get_xy_from_matrix(x_max, y_max, y)
other_x, other_y = get_xy_from_matrix(x_max, y_max, x)
if (my_x - other_x) ** 2 + (my_y - other_y) ** 2 == 1:
return str(mig) + ' '
else:
return '0 '
def generate_ssm2d_mat(x_max, y_max, mig):
mig_mat = ''
for x in range(1, x_max * y_max + 1):
for y in range(1, x_max * y_max + 1):
mig_mat += get_step_2d(x_max, y_max, x, y, mig)
mig_mat += "\r\n"
return mig_mat
def generate_island_mat(total_size, mig):
mig_mat = ''
for x in range(1, total_size + 1):
for y in range(1, total_size + 1):
if (x == y):
mig_mat += '0 '
else:
mig_mat += '!!!' + str(mig) + '!!! '
mig_mat += "\r\n"
return mig_mat
def generate_null_mat(total_size):
null_mat = ''
for x in range(1, total_size + 1):
for y in range(1, total_size + 1):
null_mat += '0 '
null_mat += '\r\n'
return null_mat
def generate_join_events(t, total_size, join_size, orig_size):
events = ''
for i in range(1, total_size - 1):
events += str(t) + ' ' + str(i) + ' 0 1 1 0 1\r\n'
events += str(t) + ' ' + str(total_size - 1) + ' 0 1 ' + str(1.0 * total_size * join_size / orig_size) + ' 0 1\r\n'
return events
def no_processor(in_string):
return in_string
def process_text(in_string, out_file_prefix, para_list, curr_values,
specific_processor):
text = specific_processor(in_string)
return process_para(text, out_file_prefix, para_list, [])
# def prepare_dir():
# try:
# mkdir(sep.join([Config.dataDir, 'SimCoal'])) # Should exist, but...
# except OSError:
# pass # Its ok if already exists
# try:
# mkdir(sep.join([Config.dataDir, 'SimCoal', 'runs']))
# except OSError:
# pass # Its ok if already exists
# sep is because of jython
def generate_model(par_stream, out_prefix, params,
specific_processor=no_processor, out_dir='.'):
# prepare_dir()
text = par_stream.read()
out_file_prefix = sep.join([out_dir, out_prefix])
return process_text(text, out_file_prefix, params, [], specific_processor)
def get_demography_template(stream, model, tp_dir=None):
"""Gets a demograpy template.
Most probably this model needs to be sent to GenCases.
- stream - Writable stream.
- param - Template file.
- tp_dir - Directory where to find the template, if None
use an internal template
"""
if tp_dir is None:
# Internal Template
filename = sep.join([builtin_tpl_dir, model + '.par'])
else:
# External template
filename = sep.join([tp_dir, model + '.par'])
with open(filename, 'r') as f:
l = f.readline()
while l != '':
stream.write(l)
l = f.readline()
def _gen_loci(stream, loci):
stream.write('//Number of contiguous linkage blocks in chromosome\n')
stream.write(str(len(loci)) + '\n')
stream.write('//Per Block: Data type, No. of loci, Recombination rate to the right-side locus, plus optional parameters\n')
for locus in loci:
stream.write(' '.join([locus[0]] +
[str(x) for x in list(locus[1])]) + '\n')
def get_chr_template(stream, chrs):
"""Writes a Simcoal2 loci template part.
stream - Writable stream.
chr - Chromosome list.
Current loci list:
- [(chr_repeats,[(marker, (params))])]
- chr_repeats --> Number of chromosome repeats
- marker --> 'SNP', 'DNA', 'RFLP', 'MICROSAT'
- params --> Simcoal2 parameters for markers (list of floats
or ints - if to be processed by generate_model)
"""
num_chrs = reduce(lambda x, y: x + y[0], chrs, 0)
stream.write('//Number of independent (unlinked) chromosomes, and "chromosome structure" flag: 0 for identical structure across chromosomes, and 1 for different structures on different chromosomes.\n')
if len(chrs) > 1 or num_chrs == 1:
stream.write(str(num_chrs) + ' 1\n')
else:
stream.write(str(num_chrs) + ' 0\n')
for chr in chrs:
repeats = chr[0]
loci = chr[1]
if len(chrs) == 1:
_gen_loci(stream, loci)
else:
for i in range(repeats):
_gen_loci(stream, loci)
def generate_simcoal_from_template(model, chrs, params, out_dir='.', tp_dir=None):
"""Writes a complete SimCoal2 template file.
This joins together get_demography_template and get_chr_template,
which are feed into generate_model
Please check the three functions for parameters (model from
get_demography_template, chrs from get_chr_template and
params from generate_model).
"""
with open(out_dir + sep + 'tmp.par', 'w') as stream:
get_demography_template(stream, model, tp_dir)
get_chr_template(stream, chrs)
# with open(out_dir + sep + 'tmp.par', 'r') as par_stream:
# print par_stream.read()
with open(out_dir + sep + 'tmp.par', 'r') as par_stream:
generate_model(par_stream, model, params, out_dir=out_dir)
|