File: barcode_extraction.py

package info (click to toggle)
spades 3.13.1+dfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 22,172 kB
  • sloc: cpp: 136,213; ansic: 48,218; python: 16,809; perl: 4,252; sh: 2,115; java: 890; makefile: 507; pascal: 348; xml: 303
file content (123 lines) | stat: -rw-r--r-- 3,800 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
############################################################################
# Copyright (c) 2015 Saint Petersburg State University
# All Rights Reserved
# See file LICENSE for details.
############################################################################

import os.path
import sys
import logging

from id_generation import generate_ids
from string_dist_utils import lcs, dist


__author__ = 'anton'


class Barcode:
    def __init__(self, id, libs):
        self.id = id
        self.libs = list(libs)
        if id == None:
            self.set_lcs_id()

    def add_ps(self, prefix, suffix):
        for lib in self.libs:
            for i in range(len(lib)):
                lib[i] = os.path.abspath(prefix + lib[i] + suffix)

    def __str__(self):
        return self.id + " " + " ".join([" ".join(lib) for lib in self.libs])

def RemoveLabel(s, code, code_range):
    for pos in range(len(s)):
        if s[pos:].startswith(code):
            for i in code_range:
                new_pos = pos + len(code)
                tmp = str(i)
                if new_pos + len(tmp) <= len(s) and s[new_pos:].startswith(tmp):
                    return s[:pos] + s[new_pos + len(tmp):]
    return s

def NormalizeR(s):
    return RemoveLabel(s, "R", [1,2])

def NormalizeLR(s):
    s = NormalizeR(s)
    return RemoveLabel(s, "L", range(1, 20))

def check_int_ids(ids):
    for id in ids:
        if not id[1].isdigit():
            return False
    return True

def generate_barcode_list(barcodes):
    ids = list(zip(barcodes, generate_ids(barcodes)))
    if check_int_ids(ids):
        ids = sorted(ids, key=lambda barcode: int(barcode[1]))
    return [(bid, "BC_" + short_id) for bid, short_id in ids]

def Normalize(file_path):
    return NormalizeLR(os.path.basename(file_path))

def GroupBy(norm, l):
    result = dict()
    for line in l:
        key = norm(line)
        if not key in result:
            result[key] = []
        result[key].append(line)
    return result

def CheckSameSize(iter, size = -1):
    for vl in iter:
        if size == -1:
            size = len(vl)
        if size != len(vl):
            return False
    return True

#todo: write better code
def ExtractBarcodes(dirs):
    files = []
    for dir in dirs:
        for file in [os.path.abspath(os.path.join(dir, file)) for file in os.listdir(dir) if os.path.isfile(os.path.join(dir, file))]:
            files.append(file)
    barcode_dict = GroupBy(Normalize, files)
    if not CheckSameSize(barcode_dict.values()):
        return None
    for bid in barcode_dict.keys():
        barcode_dict[bid] = GroupBy(NormalizeR, barcode_dict[bid]).values()
        if not CheckSameSize(barcode_dict[bid], 2):
            return None
    short_barcodes = generate_barcode_list(list(barcode_dict.keys()))
    return [Barcode(short, barcode_dict[bid]) for bid, short in short_barcodes]

def ReadDataset(file, log = logging.getLogger("ReadDataset")):
    log.info("Reading dataset from " + file + "\n")
    if os.path.exists(file) and os.path.isfile(file):
        result = []
        f = open(file, "r")
        lines = f.xreadlines()
        for line in lines:
            line = line.strip()
            if line == "":
                continue
            split = line.split()
            id = split[0]
            datasets = []
            for i in range(1, len(split), 2):
                datasets.append([split[i], split[i + 1]])
            result.append(Barcode(id, datasets))
        f.close()
        return result
    else:
        log.info("Error: Dataset file does not exist\n" + file + "\n")
        sys.exit(1)

def print_dataset(dataset, output_file, log):
    log.info("Printing dataset to " + output_file)
    open(output_file, "w").write("\n".join([str(line).strip() for line in dataset]) + "\n")