File: test_utils.py

package info (click to toggle)
fabric 1.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 1,240 kB
  • sloc: python: 7,363; makefile: 10
file content (264 lines) | stat: -rw-r--r-- 7,926 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
from __future__ import with_statement

import sys
from unittest import TestCase

from fudge import Fake, patched_context, with_fakes
from fudge.patcher import with_patched_object
from nose.tools import eq_, raises

from fabric.state import output
from fabric.utils import warn, indent, abort, puts, fastprint, error, RingBuffer
from fabric import utils  # For patching
from fabric.api import local, quiet
from fabric.context_managers import settings, hide
from fabric.colors import magenta, red
from utils import mock_streams, aborts, FabricTest, assert_contains, \
    assert_not_contains


def test_indent():
    for description, input_, output_ in (
        ("Sanity check: 1 line string",
            'Test', '    Test'),
        ("List of strings turns in to strings joined by \\n",
            ["Test", "Test"], '    Test\n    Test'),
    ):
        eq_.description = "indent(): %s" % description
        yield eq_, indent(input_), output_
        del eq_.description


def test_indent_with_strip():
    for description, input_, output_ in (
        ("Sanity check: 1 line string",
            indent('Test', strip=True), '    Test'),
        ("Check list of strings",
            indent(["Test", "Test"], strip=True), '    Test\n    Test'),
        ("Check list of strings",
            indent(["        Test", "        Test"], strip=True),
            '    Test\n    Test'),
    ):
        eq_.description = "indent(strip=True): %s" % description
        yield eq_, input_, output_
        del eq_.description


@aborts
def test_abort():
    """
    abort() should raise SystemExit
    """
    abort("Test")

class TestException(Exception):
    pass

@raises(TestException)
def test_abort_with_exception():
    """
    abort() should raise a provided exception
    """
    with settings(abort_exception=TestException):
        abort("Test")

@mock_streams('stdout')
def test_puts_with_user_output_on():
    """
    puts() should print input to sys.stdout if "user" output level is on
    """
    s = "string!"
    output.user = True
    puts(s, show_prefix=False)
    eq_(sys.stdout.getvalue(), s + "\n")

@mock_streams('stdout')
def test_puts_with_unicode_output():
    """
    puts() should print unicode input
    """
    s = u"string!"
    output.user = True
    puts(s, show_prefix=False)
    eq_(sys.stdout.getvalue(), s + "\n")


@mock_streams('stdout')
def test_puts_with_encoding_type_none_output():
    """
    puts() should print unicode output without a stream encoding
    """
    s = u"string!"
    output.user = True
    sys.stdout.encoding = None
    puts(s, show_prefix=False)
    eq_(sys.stdout.getvalue(), s + "\n")

@mock_streams('stdout')
def test_puts_with_user_output_off():
    """
    puts() shouldn't print input to sys.stdout if "user" output level is off
    """
    output.user = False
    puts("You aren't reading this.")
    eq_(sys.stdout.getvalue(), "")


@mock_streams('stdout')
def test_puts_with_prefix():
    """
    puts() should prefix output with env.host_string if non-empty
    """
    s = "my output"
    h = "localhost"
    with settings(host_string=h):
        puts(s)
    eq_(sys.stdout.getvalue(), "[%s] %s" % (h, s + "\n"))


@mock_streams('stdout')
def test_puts_without_prefix():
    """
    puts() shouldn't prefix output with env.host_string if show_prefix is False
    """
    s = "my output"
    puts(s, show_prefix=False)
    eq_(sys.stdout.getvalue(), "%s" % (s + "\n"))

@with_fakes
def test_fastprint_calls_puts():
    """
    fastprint() is just an alias to puts()
    """
    text = "Some output"
    fake_puts = Fake('puts', expect_call=True).with_args(
        text=text, show_prefix=False, end="", flush=True
    )
    with patched_context(utils, 'puts', fake_puts):
        fastprint(text)


class TestErrorHandling(FabricTest):
    dummy_string = 'test1234!'

    @with_patched_object(utils, 'warn', Fake('warn', callable=True,
        expect_call=True))
    def test_error_warns_if_warn_only_True_and_func_None(self):
        """
        warn_only=True, error(func=None) => calls warn()
        """
        with settings(warn_only=True):
            error('foo')

    @with_patched_object(utils, 'abort', Fake('abort', callable=True,
        expect_call=True))
    def test_error_aborts_if_warn_only_False_and_func_None(self):
        """
        warn_only=False, error(func=None) => calls abort()
        """
        with settings(warn_only=False):
            error('foo')

    def test_error_calls_given_func_if_func_not_None(self):
        """
        error(func=callable) => calls callable()
        """
        error('foo', func=Fake(callable=True, expect_call=True))

    @mock_streams('stdout')
    @with_patched_object(utils, 'abort', Fake('abort', callable=True,
        expect_call=True).calls(lambda x: sys.stdout.write(x + "\n")))
    def test_error_includes_stdout_if_given_and_hidden(self):
        """
        error() correctly prints stdout if it was previously hidden
        """
        # Mostly to catch regression bug(s)
        stdout = "this is my stdout"
        with hide('stdout'):
            error("error message", func=utils.abort, stdout=stdout)
        assert_contains(stdout, sys.stdout.getvalue())


    @mock_streams('stderr')
    @with_patched_object(utils, 'abort', Fake('abort', callable=True,
        expect_call=True).calls(lambda x: sys.stderr.write(x + "\n")))
    def test_error_includes_stderr_if_given_and_hidden(self):
        """
        error() correctly prints stderr if it was previously hidden
        """
        # Mostly to catch regression bug(s)
        stderr = "this is my stderr"
        with hide('stderr'):
            error("error message", func=utils.abort, stderr=stderr)
        assert_contains(stderr, sys.stderr.getvalue())

    @mock_streams('stderr')
    def test_warnings_print_magenta_if_colorize_on(self):
        with settings(colorize_errors=True):
            error("oh god", func=utils.warn, stderr="oops")
        # can't use assert_contains as ANSI codes contain regex specialchars
        eq_(magenta("\nWarning: oh god\n\n"), sys.stderr.getvalue())

    @mock_streams('stderr')
    @raises(SystemExit)
    def test_errors_print_red_if_colorize_on(self):
        with settings(colorize_errors=True):
            error("oh god", func=utils.abort, stderr="oops")
        # can't use assert_contains as ANSI codes contain regex specialchars
        eq_(red("\Error: oh god\n\n"), sys.stderr.getvalue())


class TestRingBuffer(TestCase):
    def setUp(self):
        self.b = RingBuffer([], maxlen=5)

    def test_append_empty(self):
        self.b.append('x')
        eq_(self.b, ['x'])

    def test_append_full(self):
        self.b.extend("abcde")
        self.b.append('f')
        eq_(self.b, ['b', 'c', 'd', 'e', 'f'])

    def test_extend_empty(self):
        self.b.extend("abc")
        eq_(self.b, ['a', 'b', 'c'])

    def test_extend_overrun(self):
        self.b.extend("abc")
        self.b.extend("defg")
        eq_(self.b, ['c', 'd', 'e', 'f', 'g'])

    def test_extend_full(self):
        self.b.extend("abcde")
        self.b.extend("fgh")
        eq_(self.b, ['d', 'e', 'f', 'g', 'h'])

    def test_plus_equals(self):
        self.b += "abcdefgh"
        eq_(self.b, ['d', 'e', 'f', 'g', 'h'])

    def test_oversized_extend(self):
        self.b.extend("abcdefghijklmn")
        eq_(self.b, ['j', 'k', 'l', 'm', 'n'])

    def test_zero_maxlen_append(self):
        b = RingBuffer([], maxlen=0)
        b.append('a')
        eq_(b, [])

    def test_zero_maxlen_extend(self):
        b = RingBuffer([], maxlen=0)
        b.extend('abcdefghijklmnop')
        eq_(b, [])

    def test_None_maxlen_append(self):
        b = RingBuffer([], maxlen=None)
        b.append('a')
        eq_(b, ['a'])

    def test_None_maxlen_extend(self):
        b = RingBuffer([], maxlen=None)
        b.extend('abcdefghijklmnop')
        eq_(''.join(b), 'abcdefghijklmnop')