File: squizz_sequence.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (172 lines) | stat: -rw-r--r-- 6,666 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
########################################################################################
#                                                                                      #
#   Author: Bertrand Neron,                                                            #
#   Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris.   #  
#   Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document.         #
#                                                                                      #
########################################################################################

"""
This module is used as template to build a converter module
"""
import os
import re
from subprocess import Popen , PIPE
from logging import getLogger
_log = getLogger( __name__ )

from Mobyle.MobyleError import MobyleError , UnSupportedFormatError
from Mobyle.Converter.DataConverter import DataConverter

class squizz_sequence( DataConverter ):
    
    def __init__(self , path ):
        super( squizz_sequence , self ).__init__( path )
        self.program_name = 'squizz'
        
        
    def suffixe( self , format ):
        return '.' + format.lower()
               
    def detect( self, dataFileName ):
        """
        detect the format of the data.
        @param dataFileName: the filename of the data which the format must be detected
        @type dataFileName: string
        @return: the format of this data. if the format cannot be detected , return None
        @rtype: string
        """
        
        if self.path is None :
            _log.critical( "squizz path is not configured" )
            raise MobyleError , 'squizz is required to handle data Sequence but not configured' 
        else:
            try:
                squizz_pipe = Popen( [ self.path , "-Sn" , dataFileName ] ,
                                     shell = False ,
                                     stdout = None ,
                                     stdin = None ,
                                     stderr = PIPE
                                     )
            except OSError , err :
                msg = "squizz exit abnormally: " + str(err)
                _log.critical( msg )
                raise MobyleError, msg
            squizz_pipe.wait()
            if squizz_pipe.returncode != 0:
                msg = ''.join( squizz_pipe.stderr.readlines() )
                match = re.search( "squizz: invalid option -- n" , msg )
                if match:
                    msg = "your squizz binary is too old. Please upgrade it"
                    _log.critical( msg )
                raise MobyleError , msg
                    
            for line in squizz_pipe.stderr :
                match = re.search( ": (.+) format, (\d+) entries\.$" ,  line)
                if match :
                    format = match.group(1)
                    seq_nb = int( match.group(2))
                    break
            if match and format != "UNKNOWN":
                return ( format , seq_nb )
            else:
                return ( None  , None )
         
    def detectedFormat(self):
        """
        @return: the list of detectables formats.
        @rtype: list of stings
        """
       
        return [ 'SWISSPROT' , 
                 'EMBL' ,
                 'GENBANK',
                 'CODATA',
                 'NBRF',
                 'GDE',
                 'IG',
                 'FASTA',
                 'GCG',
                 'RAW']

        
    def convert( self , dataFileName , outputFormat , inputFormat = None ):
        """
        convert a data in the format outputFormat
        @param dataFileName: the filename of the data to convert
        @type dataFileName: string
        @param outputFormat: the format in which the data must be convert in.
        @type outputFormat: string
        @param inputFormat: the format of the data 
        @type inputFormat: string
        @return: the filename of the converted data.
        @rtype: string
        @raise UnsupportedFormatError: if the outputFormat is not supported, or if the data is in unsupported format.
        """
        
        outFileName = os.path.splitext( dataFileName )[0] + self.suffixe( outputFormat )
        cmde = [ self.path ,
                "-S" ,
                "-n" ,
                "-c" , outputFormat ]
        if  inputFormat:
            cmde += [ "-f" , inputFormat ,
                     dataFileName
                     ]
        else:
            cmde.append( dataFileName )
        try:
            outFile = open( outFileName , 'w' )
        except IOError ,err :
            _log.error( "can't write outFile:" + str( err ) )
            raise MobyleError , "Sequence Convertion Error: "+ str( err )
        try:
            squizz_pipe = Popen( cmde ,
                                 shell  = False ,
                                 stdout = outFile ,
                                 stdin  = None ,
                                 stderr = PIPE
                                 )
        except OSError, err:
            msg = "squizz exit abnormally: " + str(err)
            _log.critical( msg )
            raise MobyleError, msg
            
        squizz_pipe.wait()
        err = ''.join( squizz_pipe.stderr.readlines() )
        if squizz_pipe.returncode != 0:
            msg = err
            match = re.search( ".*: unsupported format" , err )
            if match:
                _log.error( msg )
                raise  UnSupportedFormatError , msg
            match = re.search( "squizz: invalid option -- n" , err )
            if match:
                msg = "your squizz binary is too old. Please upgrade it"
                _log.critical( msg )
            raise MobyleError , msg
        else:
            outFile.close()
            match = re.search(  "(: \w+)?: (.+) format, (\d+) entries\.$",  err )
            if match:
                detectFormat = match.group(2)
                #seq_nb = int( match.group(3) )
            else:
                raise UnSupportedFormatError , str( err )
            if detectFormat != "UNKNOWN":
                return outFileName 
            else:
                # the inFormat is not recognize  
                raise UnSupportedFormatError 
    
    
    def convertedFormat(self):
        """
        @return: the list of allowed conversion ( inputFormat , outputFormat ) 
        @rtype: [ ( string inputFormat, string outputFormat )  , ... ]
        """
        formats = self.detectedFormat()
        return [ ( inputFormat , outputFomat) for inputFormat in formats for outputFomat in formats ]