File: test_add_option_context.py

package info (click to toggle)
loguru 0.7.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,556 kB
  • sloc: python: 13,164; javascript: 49; makefile: 14
file content (74 lines) | stat: -rw-r--r-- 2,719 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import multiprocessing
import os
from unittest.mock import patch

import pytest

from loguru import logger


@pytest.fixture
def reset_start_method():
    yield
    multiprocessing.set_start_method(None, force=True)


@pytest.mark.usefixtures("reset_start_method")
def test_using_multiprocessing_directly_if_context_is_none():
    logger.add(lambda _: None, enqueue=True, context=None)
    assert multiprocessing.get_start_method(allow_none=True) is not None


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
@pytest.mark.parametrize("context_name", ["fork", "forkserver"])
def test_fork_context_as_string(context_name):
    context = multiprocessing.get_context(context_name)
    with patch.object(type(context), "Lock", wraps=context.Lock) as mock:
        logger.add(lambda _: None, context=context_name, enqueue=True)
        assert mock.called
    assert multiprocessing.get_start_method(allow_none=True) is None


def test_spawn_context_as_string():
    context = multiprocessing.get_context("spawn")
    with patch.object(type(context), "Lock", wraps=context.Lock) as mock:
        logger.add(lambda _: None, context="spawn", enqueue=True)
        assert mock.called
    assert multiprocessing.get_start_method(allow_none=True) is None


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
@pytest.mark.parametrize("context_name", ["fork", "forkserver"])
def test_fork_context_as_object(context_name):
    context = multiprocessing.get_context(context_name)
    with patch.object(type(context), "Lock", wraps=context.Lock) as mock:
        logger.add(lambda _: None, context=context, enqueue=True)
        assert mock.called
    assert multiprocessing.get_start_method(allow_none=True) is None


def test_spawn_context_as_object():
    context = multiprocessing.get_context("spawn")
    with patch.object(type(context), "Lock", wraps=context.Lock) as mock:
        logger.add(lambda _: None, context=context, enqueue=True)
        assert mock.called
    assert multiprocessing.get_start_method(allow_none=True) is None


def test_global_start_method_is_none_if_enqueue_is_false():
    logger.add(lambda _: None, enqueue=False, context=None)
    assert multiprocessing.get_start_method(allow_none=True) is None


def test_invalid_context_name():
    with pytest.raises(ValueError, match=r"cannot find context for"):
        logger.add(lambda _: None, context="foobar")


@pytest.mark.parametrize("context", [42, object()])
def test_invalid_context_object(context):
    with pytest.raises(
        TypeError,
        match=r"Invalid context, it should be a string or a multiprocessing context",
    ):
        logger.add(lambda _: None, context=context)