File: bayes.py

package info (click to toggle)
python-seqcluster 1.2.7%2Bds-1
  • links: PTS, VCS
  • area: contrib
  • in suites: bullseye
  • size: 113,592 kB
  • sloc: python: 5,327; makefile: 184; sh: 122; javascript: 55
file content (98 lines) | stat: -rw-r--r-- 2,961 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""This file contains code for use with "Think Bayes",
by Allen B. Downey, available from greenteapress.com

Copyright 2012 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
from collections import defaultdict
from seqcluster.libs.thinkbayes import Pmf
import numpy as np


class _update(Pmf):
    """A map from string bowl ID to probablity."""

    def __init__(self, hypos, loci):
        """Initialize self.
        hypos: sequence of string bowl IDs
        """
        self.loci = loci
        Pmf.__init__(self)
        for hypo in hypos:
            self.Set(hypo, 1)
            self.Normalize()

    def Update(self, data):
        """Updates the PMF with new data.
        data: string cookie type
        """
        for hypo in self.Values():
            like = self.Likelihood(data, hypo)
            self.Mult(hypo, like)
        self.Normalize()

    def Likelihood(self, data, hypo):
        """The likelihood of the data under the hypothesis.
        data: string cookie type
        hypo: string bowl ID
        """
        mix = self.loci[hypo]
        like = mix[data]
        return like


def _transform(seqs):
    seqs_in_c = defaultdict(dict)
    for s, c in seqs:
        seqs_in_c[s].update({c: seqs[(s, c)]})
    return seqs_in_c


def _dict_seq_locus(list_c, loci_obj, seq_obj):
    """
    return dict with sequences = [ cluster1, cluster2 ...]
    """
    seqs = defaultdict(set)
    # n = len(list_c.keys())
    for c in list_c.values():
        for l in c.loci2seq:
            [seqs[s].add(c.id) for s in c.loci2seq[l]]

    common = [s for s in seqs if len(seqs[s]) > 1]
    seqs_in_c = defaultdict(float)
    for c in list_c.values():
        for l in c.loci2seq:
            # total = sum([v for v in loci_obj[l].coverage.values()])
            for s in c.loci2seq[l]:
                if s in common:
                    pos = seq_obj[s].pos[l]
                    # cov = 1.0 * loci_obj[l].coverage[pos] / total
                    cov = 1.0 * loci_obj[l].coverage[pos]
                    if seqs_in_c[(s, c.id)] < cov:
                        seqs_in_c[(s, c.id)] = cov
    seqs_in_c = _transform(seqs_in_c)
    return seqs_in_c


def _bayes(loci):
    data = defaultdict(dict)
    hypos = loci.keys()
    for hypo in hypos:
        data[hypo] = dict(position=loci[hypo])
    pmf = _update(hypos, data)
    pmf.Update('position')
    return pmf


def decide_by_bayes(list_c, s2p):
    # for each cluster get seq ~ loci edges
    loci_obj, seq_obj = s2p
    seqs_in_c = _dict_seq_locus(list_c, loci_obj, seq_obj)
    for s in seqs_in_c:
        total = sum(seqs_in_c[s].values())
        norm_values = 1.0 * np.array(seqs_in_c[s].values()) / total
        prob = _bayes(dict(zip(seqs_in_c[s].keys(), norm_values)))
        for clus in prob:
            list_c[clus].idmembers[s] = prob.loci[clus]["position"]
            # print "%s %s %s" % (s, clus, list_c[clus].idmembers[s])
    return list_c