File: wfa.alg.rescore.py

package info (click to toggle)
libwfa2 2.3.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,072 kB
  • sloc: ansic: 13,812; python: 540; cpp: 500; makefile: 268; sh: 176; lisp: 41
file content (179 lines) | stat: -rw-r--r-- 6,423 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/python3
# PROJECT: Wavefront Alignments Algorithms 
# LICENCE: MIT License 
# AUTHOR(S): Santiago Marco-Sola <santiagomsola@gmail.com>
# DESCRIPTION: Rescore alignment (*.alg) files
# USAGE: python3 wfa.alg.rescore.py -h

import sys
import re
import enum
import argparse

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from scipy.constants.constants import alpha

################################################################################
# Distances & penalties
################################################################################
class Distance(enum.Enum):
    edit = 1
    gap_linear = 2
    gap_affine = 3
    gap_affine_2p = 4
    
class Penalties:
    def __init__(self):
      self.distance = Distance.edit
      self.match = 0
      self.mismatch = 0
      self.indel = 0
      self.gap_open1 = 0
      self.gap_extend1 = 0
      self.gap_open2 = 0
      self.gap_extend2 = 0
      
def cigar_compute_score_edit(cigar_vector,penalties):
  score = 0
  for op in cigar_vector:
    if op[1] in "DI": score += int(op[0])
    if op[1] in "X": score += int(op[0])
  return score

def cigar_compute_score_linear(cigar_vector,penalties):
  score = 0
  for op in cigar_vector:
    if op[1] == "M": score -= int(op[0]) * penalties.match
    if op[1] == "X": score -= int(op[0]) * penalties.mismatch
    if op[1] in "DI": score -= int(op[0]) * penalties.indel
  return score

def cigar_compute_score_affine(cigar_vector,penalties):
  score = 0
  for op in cigar_vector:
    if op[1] == "M": score -= int(op[0]) * penalties.match
    if op[1] == "X": score -= int(op[0]) * penalties.mismatch
    if op[1] in "DI": score -= penalties.gap_open1 + int(op[0]) * penalties.gap_extend1
  return score

def cigar_compute_score_affine_2p(cigar_vector,penalties):
  score = 0
  for op in cigar_vector:
    if op[1] == "M": score -= int(op[0]) * penalties.match
    if op[1] == "X": score -= int(op[0]) * penalties.mismatch
    if op[1] in "DI": 
      score1 = penalties.gap_open1 + int(op[0]) * penalties.gap_extend1
      score2 = penalties.gap_open2 + int(op[0]) * penalties.gap_extend2
      score -= min(score1,score2)
  return score
      
def cigar_compute_score(cigar,penalties):
  # Parse CIGAR
  #   cigar = "10M3D5M1I10M"
  #   cigar_vector = [('10', 'M'), ('3', 'D'), ('5', 'M'), ('1', 'I'), ('10', 'M')]
  cigar_vector = re.findall(r'(\d+)([MXDI])',cigar)
  # Evaluate CIGAR
  if penalties.distance == Distance.edit:
    return cigar_compute_score_edit(cigar_vector,penalties)
  elif penalties.distance == Distance.gap_linear:
    return cigar_compute_score_linear(cigar_vector,penalties)
  elif penalties.distance == Distance.gap_affine:
    return cigar_compute_score_affine(cigar_vector,penalties)
  elif penalties.distance == Distance.gap_affine_2p:
    return cigar_compute_score_affine_2p(cigar_vector,penalties)
  
################################################################################
# Compare both files
################################################################################
def rescore_alignments(input_path,output_path,penalties,verbose):
  # Open files
  input_file = open(input_path,"rt")
  output_file = open(output_path,"wt")
  # Read lines
  while True:
      # Read lines
      line = input_file.readline()
      if not line: break # End of file
      # Extract alignments
      fields = line.split()
      cigar = fields[1] if len(fields)<=2 else fields[5]
      # Evaluate CIGAR's score
      score = cigar_compute_score(cigar,penalties)
      # Print rescore and CIGAR 
      output_file.write("%d\t%s\n" % (score,cigar))
  # Close files
  input_file.close()
  output_file.close()
  
################################################################################
# Main
################################################################################
# Configure arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i','--input',required=True,help='Input file (*.alg)')
parser.add_argument('-o','--output',required=True,help='Output file (*.alg)')
parser.add_argument('-e','--edit',
                    help='Score alignments using edit distance (--edit)')
parser.add_argument('-l','--gap-linear',
                    help='Score alignments using gap-linear distance (--gap-linear=M,X,I)')
parser.add_argument('-g','--gap-affine',
                    help='Score alignments using gap-affine distance (--gap-affine=M,X,O,E)')
parser.add_argument('-G','--gap-affine-2p',
                    help='Score alignments using gap-affine-2p distance (--gap-affine=M,X,O1,E1,O2,E2)')
parser.add_argument('-v','--verbose',action='store_true',default=False,
                    help='Display comparison differences (default=disable)')
parser.add_argument('-H',action='store_true',dest="human_readable",default=False)

# Parse arguments
args = parser.parse_args()

# Select distance
penalties = Penalties()
if args.edit is not None:
  pass # Default
elif args.gap_linear is not None:
  values = args.gap_linear.split(',')
  penalties.distance = Distance.gap_linear
  penalties.match = int(values[0])
  penalties.mismatch = int(values[1])
  penalties.indel = int(values[2])
elif args.gap_affine is not None:
  values = args.gap_affine.split(',')
  penalties.distance = Distance.gap_affine
  penalties.match = int(values[0])
  penalties.mismatch = int(values[1])
  penalties.gap_open1 = int(values[2])
  penalties.gap_extend1 = int(values[3])
elif args.gap_affine_2p is not None:
  values = args.gap_affine_2p.split(',')
  penalties.distance = Distance.gap_affine_2p
  penalties.match = int(values[0])
  penalties.mismatch = int(values[1])
  penalties.gap_open1 = int(values[2])
  penalties.gap_extend1 = int(values[3])
  penalties.gap_open2 = int(values[4])
  penalties.gap_extend2 = int(values[5])
else:
  print("[WFAReScoreAlignments] No distance provided. Using edit-distance (default)");
  
# Check penalties
if penalties.match > 0: 
  print("[WFACompareAlignments] Match penalty must be negative or zero")
  exit(-1)
if penalties.mismatch < 0 or \
   penalties.mismatch < 0 or \
   penalties.gap_open1 < 0 or \
   penalties.gap_extend1 < 0 or \
   penalties.gap_open2 < 0 or \
   penalties.gap_extend2 < 0:
  print("[WFACompareAlignments] Penalties must be positive or zero")
  exit(-1)
  
# Rescore alignments
rescore_alignments(args.input,args.output,penalties,args.verbose)