# test/test_pidfile.py
# Part of ‘python-daemon’, an implementation of PEP 3143.
#
# This is free software, and you are welcome to redistribute it under
# certain conditions; see the end of this file for copyright
# information, grant of license, and disclaimer of warranty.

""" Unit test for ‘pidfile’ module. """

import builtins
import errno
import functools
import io
import itertools
import os
import tempfile
import unittest.mock

import lockfile

import daemon.pidfile

from . import scaffold


class FakeFileDescriptorStringIO(io.StringIO, object):
    """ A StringIO class that fakes a file descriptor. """

    _fileno_generator = itertools.count()

    def __init__(self, *args, **kwargs):
        self._fileno = next(self._fileno_generator)
        super().__init__(*args, **kwargs)

    def fileno(self):
        return self._fileno

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass


def make_pidlockfile_scenarios():
    """ Make a collection of scenarios for testing `PIDLockFile` instances.

        :return: A collection of scenarios for tests involving
            `PIDLockfFile` instances.

        The collection is a mapping from scenario name to a dictionary of
        scenario attributes.
        """

    fake_current_pid = 235
    fake_other_pid = 8642
    fake_pidfile_path = tempfile.mktemp()

    fake_pidfile_empty = FakeFileDescriptorStringIO()
    fake_pidfile_current_pid = FakeFileDescriptorStringIO(
            "{pid:d}\n".format(pid=fake_current_pid))
    fake_pidfile_other_pid = FakeFileDescriptorStringIO(
            "{pid:d}\n".format(pid=fake_other_pid))
    fake_pidfile_bogus = FakeFileDescriptorStringIO(
            "b0gUs")

    scenarios = {
            'simple': {},
            'not-exist': {
                'open_func_name': 'fake_open_nonexist',
                'os_open_func_name': 'fake_os_open_nonexist',
                },
            'not-exist-write-denied': {
                'open_func_name': 'fake_open_nonexist',
                'os_open_func_name': 'fake_os_open_nonexist',
                },
            'not-exist-write-busy': {
                'open_func_name': 'fake_open_nonexist',
                'os_open_func_name': 'fake_os_open_nonexist',
                },
            'exist-read-denied': {
                'open_func_name': 'fake_open_read_denied',
                'os_open_func_name': 'fake_os_open_read_denied',
                },
            'exist-locked-read-denied': {
                'locking_pid': fake_other_pid,
                'open_func_name': 'fake_open_read_denied',
                'os_open_func_name': 'fake_os_open_read_denied',
                },
            'exist-empty': {},
            'exist-invalid': {
                'pidfile': fake_pidfile_bogus,
                },
            'exist-current-pid': {
                'pidfile': fake_pidfile_current_pid,
                'pidfile_pid': fake_current_pid,
                },
            'exist-current-pid-locked': {
                'pidfile': fake_pidfile_current_pid,
                'pidfile_pid': fake_current_pid,
                'locking_pid': fake_current_pid,
                },
            'exist-other-pid': {
                'pidfile': fake_pidfile_other_pid,
                'pidfile_pid': fake_other_pid,
                },
            'exist-other-pid-locked': {
                'pidfile': fake_pidfile_other_pid,
                'pidfile_pid': fake_other_pid,
                'locking_pid': fake_other_pid,
                },
            }

    for scenario in scenarios.values():
        scenario['pid'] = fake_current_pid
        scenario['pidfile_path'] = fake_pidfile_path
        if 'pidfile' not in scenario:
            scenario['pidfile'] = fake_pidfile_empty
        if 'pidfile_pid' not in scenario:
            scenario['pidfile_pid'] = None
        if 'locking_pid' not in scenario:
            scenario['locking_pid'] = None
        if 'open_func_name' not in scenario:
            scenario['open_func_name'] = 'fake_open_okay'
        if 'os_open_func_name' not in scenario:
            scenario['os_open_func_name'] = 'fake_os_open_okay'

    return scenarios


def setup_pidfile_fixtures(testcase):
    """ Set up common fixtures for PID file test cases.

        :param testcase: A `TestCase` instance to decorate.

        Decorate the `testcase` with attributes to be fixtures for tests
        involving `PIDLockFile` instances.
        """
    scenarios = make_pidlockfile_scenarios()
    testcase.pidlockfile_scenarios = scenarios

    def get_scenario_option(testcase, key, default=None):
        value = default
        try:
            value = testcase.scenario[key]
        except (NameError, TypeError, AttributeError, KeyError):
            pass
        return value

    func_patcher_os_getpid = unittest.mock.patch.object(
            os, "getpid",
            return_value=scenarios['simple']['pid'])
    func_patcher_os_getpid.start()
    testcase.addCleanup(func_patcher_os_getpid.stop)

    def make_fake_open_funcs(testcase):

        def fake_open_nonexist(filename, mode, buffering):
            if mode.startswith('r'):
                error = FileNotFoundError(
                        "No such file {filename!r}".format(
                            filename=filename))
                raise error
            else:
                result = testcase.scenario['pidfile']
            return result

        def fake_open_read_denied(filename, mode, buffering):
            if mode.startswith('r'):
                error = PermissionError(
                        "Read denied on {filename!r}".format(
                            filename=filename))
                raise error
            else:
                result = testcase.scenario['pidfile']
            return result

        def fake_open_okay(filename, mode, buffering):
            result = testcase.scenario['pidfile']
            return result

        def fake_os_open_nonexist(filename, flags, mode):
            if (flags & os.O_CREAT):
                result = testcase.scenario['pidfile'].fileno()
            else:
                error = FileNotFoundError(
                        "No such file {filename!r}".format(
                            filename=filename))
                raise error
            return result

        def fake_os_open_read_denied(filename, flags, mode):
            if (flags & os.O_CREAT):
                result = testcase.scenario['pidfile'].fileno()
            else:
                error = PermissionError(
                        "Read denied on {filename!r}".format(
                            filename=filename))
                raise error
            return result

        def fake_os_open_okay(filename, flags, mode):
            result = testcase.scenario['pidfile'].fileno()
            return result

        funcs = dict(
                (name, obj) for (name, obj) in vars().items()
                if callable(obj))

        return funcs

    testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)

    def fake_open(filename, mode='rt', buffering=None):
        scenario_path = get_scenario_option(testcase, 'pidfile_path')
        if filename == scenario_path:
            func_name = testcase.scenario['open_func_name']
            fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
            result = fake_open_func(filename, mode, buffering)
        else:
            result = FakeFileDescriptorStringIO()
        return result

    mock_open = unittest.mock.mock_open()
    mock_open.side_effect = fake_open

    func_patcher_builtin_open = unittest.mock.patch.object(
            builtins, "open",
            new=mock_open)
    func_patcher_builtin_open.start()
    testcase.addCleanup(func_patcher_builtin_open.stop)

    def fake_os_open(filename, flags, mode=None):
        scenario_path = get_scenario_option(testcase, 'pidfile_path')
        if filename == scenario_path:
            func_name = testcase.scenario['os_open_func_name']
            fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
            result = fake_os_open_func(filename, flags, mode)
        else:
            result = FakeFileDescriptorStringIO().fileno()
        return result

    mock_os_open = unittest.mock.MagicMock(side_effect=fake_os_open)

    func_patcher_os_open = unittest.mock.patch.object(
            os, "open",
            new=mock_os_open)
    func_patcher_os_open.start()
    testcase.addCleanup(func_patcher_os_open.stop)

    def fake_os_fdopen(fd, mode='rt', buffering=None):
        if fd == testcase.scenario['pidfile'].fileno():
            result = testcase.scenario['pidfile']
        else:
            raise OSError(errno.EBADF, "Bad file descriptor")
        return result

    mock_os_fdopen = unittest.mock.MagicMock(side_effect=fake_os_fdopen)

    func_patcher_os_fdopen = unittest.mock.patch.object(
            os, "fdopen",
            new=mock_os_fdopen)
    func_patcher_os_fdopen.start()
    testcase.addCleanup(func_patcher_os_fdopen.stop)


def make_lockfile_method_fakes(scenario):
    """ Make common fake methods for lockfile class.

        :param scenario: A scenario for testing with PIDLockFile.
        :return: A mapping from normal function name to the corresponding
            fake function.

        Each fake function behaves appropriately for the specified `scenario`.
        """

    def fake_func_read_pid():
        return scenario['pidfile_pid']

    def fake_func_is_locked():
        return (scenario['locking_pid'] is not None)

    def fake_func_i_am_locking():
        return (
                scenario['locking_pid'] == scenario['pid'])

    def fake_func_acquire(timeout=None):
        if scenario['locking_pid'] is not None:
            raise lockfile.AlreadyLocked()
        scenario['locking_pid'] = scenario['pid']

    def fake_func_release():
        if scenario['locking_pid'] is None:
            raise lockfile.NotLocked()
        if scenario['locking_pid'] != scenario['pid']:
            raise lockfile.NotMyLock()
        scenario['locking_pid'] = None

    def fake_func_break_lock():
        scenario['locking_pid'] = None

    fake_methods = dict(
            (
                func_name.replace('fake_func_', ''),
                unittest.mock.MagicMock(side_effect=fake_func))
            for (func_name, fake_func) in vars().items()
            if func_name.startswith('fake_func_'))

    return fake_methods


def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
    """ Apply common fake methods to mock lockfile class.

        :param mock_lockfile: An object providing the `LockFile` interface.
        :param testcase: The `TestCase` instance providing the context for
            the patch.
        :param scenario: The `PIDLockFile` test scenario to use.

        Mock the `LockFile` methods of `mock_lockfile`, by applying fake
        methods customised for `scenario`. The mock is does by a patch
        within the context of `testcase`.
        """
    fake_methods = dict(
            (func_name, fake_func)
            for (func_name, fake_func) in (
                make_lockfile_method_fakes(scenario).items())
            if func_name not in ['read_pid'])

    for (func_name, fake_func) in fake_methods.items():
        func_patcher = unittest.mock.patch.object(
                mock_lockfile, func_name,
                new=fake_func)
        func_patcher.start()
        testcase.addCleanup(func_patcher.stop)


def setup_pidlockfile_fixtures(testcase, scenario_name=None):
    """ Set up common fixtures for PIDLockFile test cases.

        :param testcase: The `TestCase` instance to decorate.
        :param scenario_name: The name of the `PIDLockFile` scenario to use.

        Decorate the `testcase` with attributes that are fixtures for test
        cases involving `PIDLockFile` instances.`
        """

    setup_pidfile_fixtures(testcase)

    for func_name in [
            'write_pid_to_pidfile',
            'remove_existing_pidfile',
            ]:
        func_patcher = unittest.mock.patch.object(
                lockfile.pidlockfile, func_name)
        func_patcher.start()
        testcase.addCleanup(func_patcher.stop)


class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
    """ Test cases for ‘TimeoutPIDLockFile’ class. """

    def setUp(self):
        """ Set up test fixtures. """
        super().setUp()

        pidlockfile_scenarios = make_pidlockfile_scenarios()
        self.pidlockfile_scenario = pidlockfile_scenarios['simple']

        self.scenario = {
                'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
                'acquire_timeout': self.getUniqueInteger(),
                }

        self.test_kwargs = dict(
                path=self.scenario['pidfile_path'],
                acquire_timeout=self.scenario['acquire_timeout'],
                )
        self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
                **self.test_kwargs)

    def test_inherits_from_pidlockfile(self):
        """ Should inherit from PIDLockFile. """
        instance = self.test_instance
        self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)

    def test_init_has_expected_signature(self):
        """ Should have expected signature for ‘__init__’. """
        def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
        test_func.__name__ = str('__init__')
        self.assertFunctionSignatureMatch(
                test_func,
                daemon.pidfile.TimeoutPIDLockFile.__init__)

    def test_has_specified_acquire_timeout(self):
        """ Should have specified ‘acquire_timeout’ value. """
        instance = self.test_instance
        expected_timeout = self.test_kwargs['acquire_timeout']
        self.assertEqual(expected_timeout, instance.acquire_timeout)

    @unittest.mock.patch.object(
            lockfile.pidlockfile.PIDLockFile, "__init__",
            autospec=True)
    def test_calls_superclass_init(self, mock_init):
        """ Should call the superclass ‘__init__’. """
        expected_path = self.test_kwargs['path']
        instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
        mock_init.assert_called_with(instance, expected_path)

    @unittest.mock.patch.object(
            lockfile.pidlockfile.PIDLockFile, "acquire",
            autospec=True)
    def test_acquire_uses_specified_timeout(self, mock_func_acquire):
        """ Should call the superclass ‘acquire’ with specified timeout. """
        instance = self.test_instance
        test_timeout = self.getUniqueInteger()
        expected_timeout = test_timeout
        instance.acquire(test_timeout)
        mock_func_acquire.assert_called_with(instance, expected_timeout)

    @unittest.mock.patch.object(
            lockfile.pidlockfile.PIDLockFile, "acquire",
            autospec=True)
    def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
        """
        Should call superclass ‘acquire’ with stored timeout by default.
        """
        instance = self.test_instance
        test_timeout = self.test_kwargs['acquire_timeout']
        expected_timeout = test_timeout
        instance.acquire()
        mock_func_acquire.assert_called_with(instance, expected_timeout)


# Copyright © 2008–2022 Ben Finney <ben+python@benfinney.id.au>
#
# This is free software: you may copy, modify, and/or distribute this work
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; version 3 of that license or any later version.
# No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.


# Local variables:
# coding: utf-8
# mode: python
# End:
# vim: fileencoding=utf-8 filetype=python :
