File: squizz_alignment.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (177 lines) | stat: -rw-r--r-- 6,930 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
########################################################################################
#                                                                                      #
#   Author: Sandrine Larroude                                                          #
#   Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris.   #  
#   Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document.         #
#                                                                                      #
########################################################################################

"""
This is a squizz converter module for alignment
"""

import os, re
from subprocess import Popen , PIPE

from logging import getLogger
s_log = getLogger(__name__)

from Mobyle.MobyleError import MobyleError , UnSupportedFormatError
from Mobyle.Converter.DataConverter import DataConverter

class squizz_alignment( DataConverter ):
    
    def __init__(self , path ):
        super( squizz_alignment , self ).__init__( path )
        self.program_name = 'squizz'

    def detect( self, dataFileName ):
        """
        detect the format of the data.
        @param dataFileName: the filename of the data which the format must be detected
        @type dataFileName: string
        @return: the format of this data and the number of entry. 
               if the format cannot be detected, return None
               if the number of entry cannot be detected, return None
        @rtype: ( string format , int number of entry ) 
        """
        squizz_path = self.path
        
        if squizz_path is None:
            s_log.critical( "squizz path is not configured." )
            raise MobyleError , 'squizz is required to handle Alignment data but not configured.(See section SEQCONVERTER on Config.py)'
        
        nb, format = None, None
        try:
            squizz_pipe = Popen( [ squizz_path , "-An" , dataFileName ] ,
                                 shell = False ,
                                 stdout = None ,
                                 stdin = None ,
                                 stderr = PIPE
                                 )
        except OSError, err:
            msg = "squizz exit abnormally: " + str( err )
            s_log.critical( msg )
            raise MobyleError, msg
        
        squizz_pipe.wait()
        if squizz_pipe.returncode != 0:
            msg = ''.join( squizz_pipe.stderr.readlines() )
            match = re.search( "squizz: invalid option -n" , msg )
            if match:
                msg = "Your squizz binary is too old. Please upgrade it."
                s_log.critical( msg )
            raise MobyleError , msg
        
        for line in squizz_pipe.stderr:
            match = re.search( ": (.+) format, (\d+) entries\.$" ,  line)
            if match :
                format = match.group(1)
                nb = int( match.group(2))
                break
        
        if format == "UNKNOWN": 
            format, nb = None, None
            
        return (format, nb)
    
    def detectedFormat(self):
        """
        @return: the list of detectable formats.
        @rtype: list of strings
        """
        return [ 'CLUSTAL',
                 'PHYLIP',
                 'PHYLIPI',
                 'PHYLIPS',
                 'FASTA',
                 'MEGA',
                 'MSF',
                 'NEXUS',
                 'STOCKHOLM' 
                ]
    
    
    def convert( self, dataFileName , outputFormat , inputFormat = None):
        """
        convert a data in the format outputFormat
        @param dataFileName: the filename of the data to convert
        @type dataFileName: string
        @param outputFormat: the format in which the data must be convert in.
        @type outputFormat: string
        @param inputFormat: the format of the data 
        @type inputFormat: string
        @return: the filename of the converted data.
        @rtype: string
        @raise UnsupportedFormatError: if the outputFormat is not supported, or if the data is in unsupported format.
        """
        squizz_path =  self.path
    
        if squizz_path is None:
            s_log.critical( "squizz path is not configured." )
            raise MobyleError , 'squizz is required to handle Alignment data but not configured.(See section SEQCONVERTER on Config.py)'
    
        outFileName = os.path.splitext( dataFileName )[0] + "." + outputFormat.lower()
        try:
            outFile = open( outFileName , 'w' )
        except IOError, err :
            s_log.error( "Can't write outFile:" + str( err ) )
            raise MobyleError , "Alignment Conversion Error: "+ str( err )
        
        #number of entries is also returned but not useful
        det_format , _ = self.detect (dataFileName) 
        
        if det_format:
            #Command building
            cmde =  [ squizz_path , "-c", outputFormat ]
            if  inputFormat:
                cmde += [ "-f" , inputFormat ,
                         dataFileName    ]
            else:
                cmde.append( dataFileName )
                
            try:
                squizz_pipe = Popen( cmde ,
                                     shell  = False ,
                                     stdout = outFile ,
                                     stdin  = None ,
                                     stderr = PIPE
                                     )
            except OSError, err:
                msg = "squizz exit abnormally: " + str( err )
                s_log.critical( msg )
                raise MobyleError, msg
                
            squizz_pipe.wait()
            err = ''.join( squizz_pipe.stderr.readlines() )
            if squizz_pipe.returncode != 0:
                msg = err
                match = re.search( ".*: unsupported format" , err )
                if match:
                    s_log.error( msg )
                    raise  UnSupportedFormatError , msg
                match = re.search( "squizz: invalid option -- n" , err )
                if match:
                    msg = "your squizz binary is too old. Please upgrade it"
                    s_log.critical( msg )
                raise MobyleError, msg
            else:
                outFile.close()
                return outFileName 
        else:
            # the inFormat is not recognize
            raise UnSupportedFormatError
    
    
    def convertedFormat(self):
        """
        @return: the list of allowed conversion ( inputFormat , outputFormat ) 
        @rtype: [ ( string inputFormat, string outputFormat )  , ... ]
        """
        conversions = []
        formats = self.detectedFormat()
        for inputFormat in formats:
            for outputFomat in formats:
                conversions.append( ( inputFormat , outputFomat) )
        return conversions