1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
|
from __future__ import division
from random import shuffle, random, choice
import numpy
try:
from math import factorial
except ImportError: # python version < 2.6
from cogent.maths.stats.special import Gamma
factorial = lambda x: Gamma(x+1)
from cogent.maths.stats.special import igam
__author__ = "Hua Ying, Julien Epps and Gavin Huttley"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Julien Epps", "Hua Ying", "Gavin Huttley"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Gavin Huttley"
__email__ = "Gavin.Huttley@anu.edu.au"
__status__ = "Production"
def chi_square(x, p, df=1):
"""returns the chisquare statistic and it's probability"""
N = len(x)
end = N
sim = numpy.logical_not(numpy.logical_xor(x[0:end-p], x[p:end]))*1
s = ((numpy.ones((N-p,), float)-sim)**2).sum()
D = s/(N-p)
p_val = 1 - igam(df/2.0, D/2)
return D, p_val
def g_statistic(X, p=None, idx=None):
"""
return g statistic and p value
arguments:
X - the periodicity profile (e.g. DFT magnitudes, autocorrelation etc)
X needs to contain only those period values being considered,
i.e. only periods in the range [llim, ulim]
"""
# X should be real
X = abs(numpy.array(X))
if p is None:
power = X.max(0)
idx = X.argmax(0)
else:
assert idx is not None
power = X[idx]
g_obs = power/X.sum()
M = numpy.floor(1/g_obs)
pmax = len(X)
result = numpy.zeros((int(M+1),), float)
pmax_fact = factorial(pmax)
for index in xrange(1, min(pmax, int(M))+1):
v = (-1)**(index-1)*pmax_fact/factorial(pmax-index)/factorial(index)
v *= (1-index*g_obs)**(pmax-1)
result[index] = v
p_val = result.sum()
return g_obs, p_val
def _seq_to_symbols(seq, motifs, motif_length, result=None):
"""return symbolic represetation of the sequence
Arguments:
- seq: a sequence
- motifs: a list of sequence motifs
- motif_length: length of first motif
"""
if result is None:
result = numpy.zeros(len(seq), numpy.uint8)
else:
result.fill(0)
if motif_length is None:
motif_length = len(motifs[0])
for i in xrange(len(seq) - motif_length + 1):
if seq[i: i + motif_length] in motifs:
result[i] = 1
return result
try:
from cogent.maths._period import seq_to_symbols
#raise ImportError
except ImportError:
seq_to_symbols = _seq_to_symbols
class SeqToSymbols(object):
"""class for converting all occurrences of motifs in passed sequence
to 1/0 otherwise"""
def __init__(self, motifs, length=None, motif_length=None):
super(SeqToSymbols, self).__init__()
if type(motifs) == str:
motifs = [motifs]
self.motifs = motifs
self.length = length
self.motif_length = motif_length or len(motifs[0])
self.working = None
if length is not None:
self.setResultArray(length)
def setResultArray(self, length):
"""sets a result array for length"""
self.working = numpy.zeros(length, numpy.uint8)
self.length = length
def __call__(self, seq, result=None):
if result is None and self.working is None:
self.setResultArray(len(seq))
elif self.working is not None:
if len(seq) != self.working.shape[0]:
self.setResultArray(len(seq))
result = self.working
result.fill(0)
if type(seq) != str:
seq = ''.join(seq)
return seq_to_symbols(seq, self.motifs, self.motif_length, result)
def circular_indices(vector, start, length, num):
"""docstring for circular_indices"""
if start > length:
start = start-length
if start+num < length:
return vector[start: start+num]
# get all till end, then from beginning
return vector[start:] + vector[:start+num-length]
def sampled_places(block_size, length):
"""returns randomly sampled positions with block_size to make a new vector
with length
"""
# Main condition is to identify when a draw would run off end, we want to
# draw from beginning
num_seg, remainder = divmod(length, block_size)
vector = range(length)
result = []
for seg_num in xrange(num_seg):
i = choice(vector)
result += circular_indices(vector, i, length, block_size)
if remainder:
result += circular_indices(vector, i+block_size, length, remainder)
assert len(result) == length, len(result)
return result
def blockwise_bootstrap(signal, calc, block_size, num_reps, seq_to_symbols=None, num_stats=None):
"""returns observed statistic and the probability from the bootstrap
test of observing more `power' by chance than that estimated from the
observed signal
Arguments:
- signal: a series, can be a sequence object
- calc: function to calculate the period power, e.g. ipdft, hybrid,
auto_corr or any other statistic.
- block_size: size of contiguous values for resampling
- num_reps: number of randomly generated permutations
- seq_to_symbols: function to convert a sequence to 1/0. If not
provided, the raw data is used.
- num_stats: the number of statistics being evaluated for each
interation. Default to 1.
"""
signal_length = len(signal)
if seq_to_symbols is not None:
dtype='c'
else:
dtype=None # let numpy guess
signal = numpy.array(list(signal), dtype=dtype)
if seq_to_symbols is not None:
symbolic = seq_to_symbols(signal)
data = symbolic
else:
data = signal
obs_stat = calc(data)
if seq_to_symbols is not None:
if sum(symbolic) == 0:
p = [numpy.array([1.0, 1.0, 1.0]), 1.0][num_stats == 1]
return obs_stat, p
if num_stats is None:
try:
num_stats = calc.getNumStats()
except AttributeError:
num_stats = 1
if num_stats == 1:
count = 0
else:
count = numpy.zeros(num_stats)
for rep in range(num_reps):
# get sample positions
sampled_indices = sampled_places(block_size, signal_length)
new_signal = signal.take(sampled_indices)
if seq_to_symbols is not None:
symbolic = seq_to_symbols(new_signal)
data = symbolic
else:
data = new_signal
sim_stat = calc(data)
# count if > than observed
if num_stats > 1:
count[sim_stat >= obs_stat] += 1
elif sim_stat >= obs_stat:
count += 1
return obs_stat, count / num_reps
# def percrb4():
# """Return SNR and CRB for periodicity estimates from symbolic signals"""
# # TODO: complete the function according to Julien's percrb4.m
# pass
#
|