File: amplicon_contingency_table.py

package info (click to toggle)
swarm-cluster 3.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,140 kB
  • sloc: cpp: 7,092; python: 185; makefile: 64; sh: 12
file content (101 lines) | stat: -rw-r--r-- 3,741 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
    Read all fasta files and build a sorted amplicon contingency
    table. Usage: python3 amplicon_contingency_table.py samples_*.fas
"""

__author__ = "Frédéric Mahé <frederic.mahe@cirad.fr>"
__date__ = "2019/09/24"
__version__ = "$Revision: 3.0"

import os
import sys
import operator

#*****************************************************************************#
#                                                                             #
#                                  Functions                                  #
#                                                                             #
#*****************************************************************************#


def fasta_parse():
    """
    Map amplicon ids, abundances and samples
    """
    separator = ";size="
    fasta_files = sys.argv[1:]
    all_amplicons = dict()
    samples = dict()
    amplicons2samples = dict()
    for fasta_file in fasta_files:
        sample = os.path.basename(fasta_file)
        sample = os.path.splitext(sample)[0]
        samples[sample] = samples.get(sample, 0) + 1
        with open(fasta_file, "r") as fasta_file:
            for line in fasta_file:
                if line.startswith(">"):
                    amplicon, abundance = line.strip(">;\n").split(separator)
                    abundance = int(abundance)
                    if amplicon not in amplicons2samples:
                        amplicons2samples[amplicon] = {sample: abundance}
                    else:
                        # deal with duplicated samples
                        amplicons2samples[amplicon][sample] = amplicons2samples[amplicon].get(sample, 0) + abundance
                    all_amplicons[amplicon] = all_amplicons.get(amplicon, 0) + abundance

    # deal with duplicated samples
    duplicates = [sample for sample in samples if samples[sample] > 1]
    if duplicates:
        print("Warning: some samples are duplicated", file=sys.stderr)
        print("\n".join(duplicates), file=sys.stderr)
    samples = sorted(samples.keys())

    return all_amplicons, amplicons2samples, samples


def main():
    """
    Read all fasta files and build a sorted amplicon contingency table
    """
    # Parse command line
    all_amplicons, amplicons2samples, samples = fasta_parse()

    # Sort amplicons by decreasing abundance (and by amplicon name)
    sorted_all_amplicons = sorted(iter(all_amplicons.items()),
                                  key=operator.itemgetter(1, 0))
    sorted_all_amplicons.reverse()

    # Print table header
    print("amplicon", "\t".join(samples), "total", sep="\t", file=sys.stdout)

    # Print table content
    for amplicon, abundance in sorted_all_amplicons:
        abundances = [amplicons2samples[amplicon].get(sample, 0)
                      for sample in samples]
        total = sum(abundances)
        abundances = [str(i) for i in abundances]
        # Sanity check
        if total == abundance:
            print(amplicon, "\t".join(abundances), total, sep="\t",
                  file=sys.stdout)
        else:
            print("Abundance sum is not correct for this amplicon",
                  amplicon, abundance, total, file=sys.stderr)
            sys.exit(-1)

    return


#*****************************************************************************#
#                                                                             #
#                                     Body                                    #
#                                                                             #
#*****************************************************************************#

if __name__ == '__main__':

    main()

sys.exit(0)