File: support.py

package info (click to toggle)
chemfp 1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,580 kB
  • sloc: python: 9,390; ansic: 2,363; makefile: 110
file content (309 lines) | stat: -rw-r--r-- 12,044 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import sys
import os
from cStringIO import StringIO
import tempfile

# Ignore the close. io.write_fps1_output() auto-closes its output.
class SIO(object):
    def __init__(self):
        self.sio = StringIO()
    def write(self, s):
        return self.sio.write(s)
    def writelines(self, lines):
        return self.sio.writelines(lines)
    def close(self):
        # Ignore this
        pass
    def getvalue(self):
        return self.sio.getvalue()

# Given a filename in the "tests/" directory, return its full path

_dirname = os.path.dirname(__file__)
def fullpath(name):
    path = os.path.join(_dirname, name)
    assert os.path.exists(path), path
    return path

PUBCHEM_SDF = fullpath("pubchem.sdf")
PUBCHEM_SDF_GZ = fullpath("pubchem.sdf.gz")
PUBCHEM_ANOTHER_EXT = fullpath("pubchem.should_be_sdf_but_is_not")

MISSING_TITLE = fullpath("missing_title.sdf")

real_stdin = sys.stdin
real_stdout = sys.stdout
real_stderr = sys.stderr

class Runner(object):
    def __init__(self, main):
        self.main = main

    def pre_run(self):
        pass
    def post_run(self):
        pass

    def run(self, cmdline, source=PUBCHEM_SDF):
        if isinstance(cmdline, basestring):
            args = cmdline.split()
        else:
            args = cmdline
            assert isinstance(args, list) or isinstance(args, tuple)
        if source is not None:
            args = args + [source]
        self.pre_run()

        try:
            sys.stdout = stdout = SIO()
            self.main(args)
        finally:
            sys.stdout = real_stdout

        self.post_run()

        result = stdout.getvalue().splitlines()
        if result:
            self.verify_result(result)
        return result

    def verify_result(self, result):
        assert result[0] == "#FPS1"
        # TODO: .. verify more more line format ...

    def run_stdin(self, cmdline):
        raise NotImplementedError("Implement in the derived class")

    def run_fps(self, cmdline, expect_length=None, source=PUBCHEM_SDF):
        result = self.run(cmdline, source)
        while result[0].startswith("#"):
            del result[0]
        if expect_length is not None:
            assert len(result) == expect_length, (len(result), expect_length)
        return result

    def run_split(self, cmdline, expect_length=None, source=PUBCHEM_SDF):
        "split into dict of headers and list of values"
        result = self.run(cmdline, source)
        headers = {}
        fps = []
        result_iter = iter(result)
        # I know the first line is correct (it was tested in verify_result)
        # Plus, this lets the SimsearchRunner use run_split
        result_iter.next()
        for line in result_iter:
            if line.startswith("#"):
                k, v = line.split("=", 1)
                assert k not in headers
                headers[k] = v
                continue
            fps.append(line)
            break
        fps.extend(result_iter)
        if expect_length is not None:
            assert len(fps) == expect_length, (len(fps), expect_length)
        return headers, fps
            

    def run_exit(self, cmdline, source=PUBCHEM_SDF):
        sys.stderr = stderr = SIO()
        try:
            try:
                self.run(cmdline, source)
            except SystemExit:
                pass
            else:
                raise AssertionError("should have exited: %r" % (cmdline,))
        finally:
            sys.stderr = real_stderr
        return stderr.getvalue()

    def run_split_capture(self, cmdline, expect_length=None, source=PUBCHEM_SDF):
        sys.stderr = stderr = SIO()
        try:
            try:
                headers, fps = self.run_split(cmdline, expect_length, source)
            except SystemExit:
                raise AssertionError("unexpected SystemExit")
        finally:
            sys.stderr = real_stderr
        return headers, fps, stderr.getvalue()
        

####

def can_skip(name):
    s = os.environ.get("TOX_CHEMFP_TEST", "")
    return not (s.startswith(name) or (","+name) in s)

#### fingerprint encoding

def set_bit(n):
    assert n <= 16
    bytes = [0, 0, 0]
    bytes[n//8] = 1<<(n%8)
    return "%02x%02x%02x" % tuple(bytes)

class TestIdAndErrors(object):
    #
    # One of the records doesn't have an XLOGP field
    #
    def test_missing_id_tag(self):
        errmsg = self._runner.run_exit("--id-tag PUBCHEM_CACTVS_XLOGP")
        self.assertIn("ERROR: Missing id tag 'PUBCHEM_CACTVS_XLOGP' for record #7 ", errmsg)
        self.assertIn("pubchem.sdf", errmsg)

    # Should be the same as the previous code.
    def test_missing_id_strict(self):
        errmsg = self._runner.run_exit("--id-tag PUBCHEM_CACTVS_XLOGP --errors strict")
        self.assertIn("ERROR: Missing id tag 'PUBCHEM_CACTVS_XLOGP' for record #7 ", errmsg)
        self.assertIn("pubchem.sdf", errmsg)
    

    def test_missing_id_tag_report(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag PUBCHEM_CACTVS_XLOGP --errors report", 18)
        self.assertIn("ERROR: Missing title for record #1", errmsg)
        self.assertIn("missing_title.sdf", errmsg)
        self.assertEquals(fps[-1], "")

    def test_missing_id_tag_ignore(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag PUBCHEM_CACTVS_XLOGP --errors ignore", 18)
        self.assertNotIn("ERROR: Missing title for record #1", errmsg)
        self.assertNotIn("missing_title.sdf", errmsg)
        ids = [fp.split("\t")[1] for fp in fps]
        self.assertEquals(ids, ['2.8', '1.9', '1', '3.3', '1.5', '2.6', '-0.9', '2', '2.1', 
                                '2.9', '1.7', '-1.5', '0.4', '0.6', '0.4', '0.4', '2', '2.5'])


    #
    # Various ways of having a strange title
    #

    def test_missing_title(self):
        errmsg = self._runner.run_exit("", MISSING_TITLE)
        self.assertIn("ERROR: Missing title for record #1", errmsg)

    def test_missing_title_strict(self):
        errmsg = self._runner.run_exit("--errors strict", MISSING_TITLE)
        self.assertIn("ERROR: Missing title for record #1", errmsg)

    def test_missing_title_report(self):
        headers, fps, errmsg = self._runner.run_split_capture("--errors report", 1, MISSING_TITLE)
        self.assertIn("ERROR: Missing title for record #1", errmsg)
        self.assertNotIn("ERROR: Missing title for record #2", errmsg)
        self.assertIn("ERROR: Missing title for record #3", errmsg)
        self.assertEquals(len(fps), 1)
        self.assertEquals(fps[0].split("\t")[1], "Good")

    def test_missing_title_ignore(self):
        headers, fps, errmsg = self._runner.run_split_capture("--errors ignore", 1, MISSING_TITLE)
        self.assertNotIn("ERROR: Missing title for record #1", errmsg)
        self.assertNotIn("ERROR: Missing title for record #2", errmsg)
        self.assertNotIn("ERROR: Missing title for record #3", errmsg)
        self.assertEquals(len(fps), 1)
        self.assertEquals(fps[0].split("\t")[1], "Good")

    #
    # Various ways of handling a missing id in a tag
    #

    def test_missing_id_tag(self):
        errmsg = self._runner.run_exit("--id-tag Blank", MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Blank' for record #1", errmsg)

    def test_missing_id_tag_strict(self):
        errmsg = self._runner.run_exit("--id-tag Blank --errors strict", MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Blank' for record #1", errmsg)
        self.assertIn("missing_title.sdf", errmsg)

    def test_missing_id_tag_report(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag Blank --errors report", 1, MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Blank' for record #1", errmsg)
        self.assertIn("ERROR: Empty id tag 'Blank' for record #2", errmsg)
        self.assertNotIn("ERROR: Empty id tag 'Blank' for record #3", errmsg)
        self.assertEquals(fps[0].split("\t")[1], "This is not Blank")

    def test_missing_id_tag_ignore(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag Blank --errors ignore", 1, MISSING_TITLE)
        self.assertNotIn("ERROR: Empty id tag 'Blank' for record #1", errmsg)
        self.assertNotIn("ERROR: Empty id tag 'Blank' for record #2", errmsg)
        self.assertNotIn("ERROR: Empty id tag 'Blank' for record #3", errmsg)
        self.assertEquals(fps[0].split("\t")[1], "This is not Blank")

    #
    # Various ways of handling a tab characters in an id tag
    #

    def test_tab_id_tag(self):
        errmsg = self._runner.run_exit("--id-tag Tab", MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Tab' for record #2", errmsg)

    def test_tab_id_tag_strict(self):
        errmsg = self._runner.run_exit("--id-tag Tab --errors strict", MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Tab' for record #2", errmsg)
        self.assertIn("missing_title.sdf", errmsg)

    def test_tab_id_tag_report(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag Tab --errors report", 2, MISSING_TITLE)
        self.assertIn("ERROR: Empty id tag 'Tab' for record #2", errmsg)
        self.assertEquals(fps[0].split("\t")[1], "Leading tab")
        self.assertEquals(fps[1].split("\t")[1], "This does not")

    def test_tab_id_tag_ignore(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag Tab --errors ignore", 2, MISSING_TITLE)
        self.assertNotIn("ERROR: Empty id tag 'Tab'", errmsg)
        self.assertEquals(fps[0].split("\t")[1], "Leading tab")
        self.assertEquals(fps[1].split("\t")[1], "This does not")


    def test_contains_tab_id_tag(self):
        headers, fps = self._runner.run_split("--id-tag ContainsTab", 3, MISSING_TITLE)
        ids = [fp.split("\t")[1] for fp in fps]
        self.assertEquals(ids, ["ThreeTabs", "tabseparated", "twotabs"])

    def test_contains_tab_id_tag_strict(self):
        headers, fps = self._runner.run_split("--id-tag ContainsTab --errors strict", 3, MISSING_TITLE)
        ids = [fp.split("\t")[1] for fp in fps]
        self.assertEquals(ids, ["ThreeTabs", "tabseparated", "twotabs"])

    def test_contains_tab_id_tag_report(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag ContainsTab --errors report", 3, MISSING_TITLE)
        self.assertNotIn("ContainsTab", errmsg)
        self.assertNotIn("ERROR", errmsg)
        ids = [fp.split("\t")[1] for fp in fps]
        self.assertEquals(ids, ["ThreeTabs", "tabseparated", "twotabs"])

    def test_contains_tab_id_tag_ignore(self):
        headers, fps, errmsg = self._runner.run_split_capture("--id-tag ContainsTab --errors ignore", 3, MISSING_TITLE)
        self.assertNotIn("ERROR: Empty id tag 'ContainsTab'", errmsg)
        ids = [fp.split("\t")[1] for fp in fps]
        self.assertEquals(ids, ["ThreeTabs", "tabseparated", "twotabs"])

    #
    # Handling bad files
    #

    def test_handles_missing_filename(self):
        errmsg = self._runner.run_exit("this_file_does_not_exist.sdf", PUBCHEM_SDF)
        self.assertIn("Structure file '", errmsg)
        self.assertIn("this_file_does_not_exist.sdf", errmsg)
        self.assertIn("' does not exist", errmsg)
        self.assertNotIn("pubchem", errmsg)

    def test_handles_missing_filename_at_end(self):
        errmsg = self._runner.run_exit([PUBCHEM_SDF, "this_file_does_not_exist.sdf"])
        self.assertIn("Structure file '", errmsg)
        self.assertIn("this_file_does_not_exist.sdf", errmsg)
        self.assertIn("' does not exist", errmsg)
        self.assertNotIn("pubchem", errmsg)

    def test_unreadable_file(self):
        tf = tempfile.NamedTemporaryFile(suffix="unreadable.sdf")
        try:
            os.chmod(tf.name, 0222)
            errmsg = self._runner.run_exit([PUBCHEM_SDF, tf.name])
            self.assertIn("Problem reading structure fingerprints", errmsg)
            self.assertIn("unreadable.sdf", errmsg)
            self.assertNotIn("pubchem", errmsg)
        finally:
            tf.close()