File: test_isrcsubmit.py

package info (click to toggle)
isrcsubmit 2.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 508 kB
  • sloc: python: 1,247; makefile: 143; sh: 2
file content (383 lines) | stat: -rwxr-xr-x 12,062 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python
# Copyright (C) 2014  Johannes Dewender
# This test is free. You can redistribute and/or modify it at will.

import os
import re
import sys
import math
import json
import pickle
import unittest
from io import TextIOWrapper, BytesIO
from subprocess import Popen

import musicbrainzngs
import isrcsubmit


try:
    import discid
    from discid import DiscError
except ImportError:
    from libdiscid.compat import discid
    from libdiscid.compat.discid import DiscError

SCRIPT_NAME = "isrcsubmit.py"
TEST_DATA = "test_data/"
SAVE_RUN = False


class TestInternal(unittest.TestCase):
    def setUp(self):
        # suppress output
        with open(os.devnull, 'w') as devnull:
            self._old_stdout = os.dup(sys.stdout.fileno())
            os.dup2(devnull.fileno(), 1)

    def test_encoding(self):
        self.assertTrue(type(isrcsubmit.encode("test")) is type(b"test"))
        self.assertEqual(isrcsubmit.encode("test"), b"test")
        self.assertTrue(type(isrcsubmit.decode(b"test"))
                        is type(b"test".decode()))
        self.assertEqual(isrcsubmit.decode(b"test"), "test")

        string = "test"
        self.assertEqual(isrcsubmit.decode(isrcsubmit.encode(string)),
                         string)
        bytestring = b"test"
        self.assertEqual(isrcsubmit.encode(isrcsubmit.decode(bytestring)),
                         bytestring)

    def test_gather_options(self):
        # make sure most important options always work
        options = isrcsubmit.gather_options([SCRIPT_NAME])
        self.assertFalse(options.debug)
        self.assertTrue(options.backend)
        self.assertEqual(options.server, "musicbrainz.org")
        self.assertFalse(options.force_submit)
        self.assertTrue(options.release_id is None)

        user = "JonnyJD"
        device = "/some/other/device"
        options = isrcsubmit.gather_options([SCRIPT_NAME, user, device])
        self.assertEqual(options.user, user)
        self.assertEqual(options.device, device)
        options = isrcsubmit.gather_options([SCRIPT_NAME,
                                             "-d", device, "-u", user])
        self.assertEqual(options.user, user)
        self.assertEqual(options.device, device)
        options = isrcsubmit.gather_options([SCRIPT_NAME, "--user", user,
                                             "--device", device])
        self.assertEqual(options.user, user)
        self.assertEqual(options.device, device)

    def tearDown(self):
        # restore output
        os.dup2(self._old_stdout, 1)


# mock musicbrainzngs queries
# - - - - - - - - - - - - - -

# save mbngs functions
_mbngs_get_releases_by_discid = musicbrainzngs.get_releases_by_discid
_mbngs_get_release_by_id = musicbrainzngs.get_release_by_id
_mbngs_submit_isrcs = musicbrainzngs.submit_isrcs

def _get_releases_by_discid(disc_id, includes=[]):
    file_name = "%s%s_releases.json" % (TEST_DATA, disc_id)
    if SAVE_RUN:
        releases = _mbngs_get_releases_by_discid(disc_id, includes)
        with open(file_name, "w") as releases_file:
            json.dump(releases, releases_file, indent=2)
        return releases
    else:
        with open(file_name, "r") as releases_file:
            return json.load(releases_file)

musicbrainzngs.get_releases_by_discid = _get_releases_by_discid

def _get_release_by_id(release_id, includes=[]):
    file_name = "%s%s.json" % (TEST_DATA, release_id)
    if SAVE_RUN:
        release = _mbngs_get_release_by_id(release_id, includes)
        with open(file_name, "w") as release_file:
            json.dump(release, releases_file, indent=2)
        return releases
    else:
        with open(file_name, "r") as release_file:
            return json.load(release_file)

musicbrainzngs.get_release_by_id = _get_release_by_id

def _submit_isrcs(tracks2isrcs):
    global data_sent
    data_sent.tracks2isrcs = tracks2isrcs
    return True

musicbrainzngs.submit_isrcs = _submit_isrcs


# mock discid reading
# - - - - - - - - - -

class MockedTrack(object):
    def __init__(self, track):
        self.number = track.number
        self.isrc = track.isrc

class MockedDisc(object):
    def __init__(self, disc):
        self.id = disc.id
        self.submission_url = disc.submission_url
        self.mcn = disc.mcn
        tracks = []
        for track in disc.tracks:
            tracks.append(MockedTrack(track))
        self.tracks = tracks

# save discid functions
_discid_read = discid.read

def _read(device=None, features=[]):
    if SAVE_RUN:
        # always read all features to save full libdiscid information
        disc = _discid_read(device, ["read", "mcn", "isrc"])
        file_name = "%s%s.pickle" % (TEST_DATA, disc.id)
        with open(file_name, "wb") as disc_file:
            pickle.dump(MockedDisc(disc), disc_file, 2)
        return disc
    else:
        file_name = "%s%s.pickle" % (TEST_DATA, mocked_disc_id)
        with open(file_name, "rb") as disc_file:
            return pickle.load(disc_file)

isrcsubmit.discid.read = _read


# mock cdrdao reading
# - - - - - - - - - -

class _Popen(Popen):
    def __new__(cls, args, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr):
        if args[0] == "cdrdao":
            file_name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
            if SAVE_RUN:
                # save file to a different place
                args[-1] = file_name
                # delete file so cdrdao doesn't complain it's already there
                os.remove(file_name)
            else:
                # don't actually call cdrdao
                args = ["echo", "mocked cdrdao"]
        return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr)

isrcsubmit.Popen = _Popen

def _open(name, mode):
    if re.search(r"cdrdao-.*\.toc", name):
        name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
    return open(name, mode)

isrcsubmit.open = _open

# general mocking
# - - - - - - - -

_isrcsubmit_has_program = isrcsubmit.has_program
_isrcsubmit_get_prog_version = isrcsubmit.get_prog_version

def _has_program(program, strict=False):
    if program == "libdiscid":
        # we mock it anyways
        # libdiscid >= 0.2.2 still needed to load discid
        return True
    elif program == "cdrdao":
        # also mocked
        return True
    else:
        return _isrcsubmit_has_program(program, strict)

def _get_prog_version(prog):
    if prog == "libdiscid":
        version = "mocked libdiscid"
    elif prog == "cdrdao":
        version = "mocked cdrdao"
    else:
        return _isrcsubmit_get_prog_version(prog)
    return isrcsubmit.decode(version)

isrcsubmit.has_program = _has_program
isrcsubmit.get_prog_version = _get_prog_version


# mock answers given by user
# - - - - - - - - - - - - -

def append_to_stdin(msg):
    position = sys.stdin.tell()
    sys.stdin.write(msg)
    sys.stdin.seek(position)

def answer_on_stdin(answer, default=True):
    if answer == default:
        append_to_stdin("\n")
    elif answer:
        append_to_stdin("y\n")
    else:
        append_to_stdin("n\n")

def handle_question(string):
    global last_question
    question = False
    default = False

    # these questions can be handled right now
    if "username" in string:
        append_to_stdin("invalid_username\n")   # doesn't matter, mocked
    elif "password" in string:
        append_to_stdin("invalid_password\n")   # doesn't matter, mocked
    elif "Which one do you want?" in string:
        try:
            append_to_stdin("%d\n" % answers["choice"])
        except KeyError:
            append_to_stdin("1\n")
    elif "press <return>" in string:
        append_to_stdin("\n")

    # these questions use multiple writes
    if "submit" in string and "disc" in string:
        last_question = "submit_disc"
    elif "help clean" in string:
        last_question = "clean"
    elif "ISRC in browser" in string:
        last_question = "open_isrc"
    # question and prompt can be on different writes
    if "[y/N]" in string: question = True; default = False
    if "[Y/n]" in string: question = True; default = True

    if question:
        try:
            answer = answers[last_question]
        except KeyError:
            answer = default
        answer_on_stdin(answer, default)


class SmartStdin(TextIOWrapper):
    def write(self, string):
        if type(string) == bytes:
            string = string.decode()
        return super(type(self), self).write(string)

class SmartStdout(TextIOWrapper):
    def write(self, string):
        handle_question(string)
        # using "except TypeError" and the buffer would be nice
        # but exceptions don't seem to work here in Python 2.6
        # additionally TexIOWrapper doesn't have "buffer" in 2.6
        if type(string) == bytes:
            string = string.decode()
        return super(type(self), self).write(string)


# the actual tests of the overall script
# - - - - - - - - - - - - - - - - - - -

class TestScript(unittest.TestCase):
    def setUp(self):
        global answers, data_sent, mocked_disc_id
        global last_question

        # make sure globals are unset
        answers = data_sent = {}
        mocked_disc_id = last_question = None

        # gather output
        self._old_stdout = sys.stdout
        self._stdout = SmartStdout(BytesIO(), sys.stdout.encoding)
        sys.stdout = self._stdout
        self._old_stdin = sys.stdin
        self._stdin = SmartStdin(BytesIO(), sys.stdin.encoding)
        sys.stdin = self._stdin

    def _output(self):
        sys.stdout.seek(0)
        return sys.stdout.read()

    def assert_output(self, string):
        self.assertTrue(string in self._output().strip())

    def _debug(self):
        return sys.stderr.write(self._output())

    def test_version(self):
        try:
            isrcsubmit.main([SCRIPT_NAME, "--version"])
        except SystemExit:
            pass
        finally:
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())

    def test_help(self):
        try:
            isrcsubmit.main([SCRIPT_NAME, "-h"])
        except SystemExit:
            pass
        finally:
            self.assertTrue(self._output().strip())

    def test_libdiscid(self):
        global mocked_disc_id
        mocked_disc_id = "TqvKjMu7dMliSfmVEBtrL7sBSno-"
        # we use defaults to questions -> no settings here
        try:
            isrcsubmit.main([SCRIPT_NAME, "--backend", "libdiscid"])
        except SystemExit:
            pass
        finally:
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())
            self.assert_output("mocked libdiscid")
            self.assert_output("TqvKjMu7dMliSfmVEBtrL7sBSno-")
            self.assert_output("07090529-0fbf-4bd3-adc4-fe627343976d")
            self.assert_output("submit the disc?")
            self.assert_output("DEC680000220 is already attached to track 4")
            self.assert_output("No new ISRCs")

    def test_cdrdao(self):
        global mocked_disc_id
        mocked_disc_id = "hSI7B4G4AkB5.DEBcW.3KCn.D_E-"
        answers["choice"] = 1
        try:
            isrcsubmit.main([SCRIPT_NAME, "--backend", "cdrdao", "--device", "/dev/cdrw"])
        except SystemExit:
            pass
        finally:
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())
            self.assert_output("mocked cdrdao")
            self.assert_output("hSI7B4G4AkB5.DEBcW.3KCn.D_E-")
            self.assert_output("none of these")
            self.assert_output("174a5513-73d1-3c9d-a316-3c1c179e35f8")
            self.assert_output("GBBBN7902023 is already attached to track 7")
            self.assert_output("No new ISRCs")

    def tearDown(self):
        # restore output
        sys.stdout = self._old_stdout
        self._stdout.close()
        sys.stdin = self._old_stdin



class TestDisc(unittest.TestCase):
    """Test reading the disc currently in the drive
    """
    pass


if __name__ == "__main__":
    unittest.main()


# vim:set shiftwidth=4 smarttab expandtab: