#    Copyright 2011 OpenStack Foundation
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import configparser
import logging
import logging.handlers
import os
import tempfile
from unittest import mock
import uuid

import fixtures
import testtools

from oslo_rootwrap import cmd
from oslo_rootwrap import daemon
from oslo_rootwrap import filters
from oslo_rootwrap import subprocess
from oslo_rootwrap import wrapper


class RootwrapLoaderTestCase(testtools.TestCase):

    def test_privsep_in_loader(self):
        privsep = ["privsep-helper", "--context", "foo"]
        filterlist = wrapper.load_filters([])

        # mock out get_exec because
        with mock.patch.object(filters.CommandFilter, 'get_exec') as ge:
            ge.return_value = "/fake/privsep-helper"
            filtermatch = wrapper.match_filter(filterlist, privsep)

            self.assertIsNotNone(filtermatch)
            self.assertEqual(["/fake/privsep-helper", "--context", "foo"],
                             filtermatch.get_command(privsep))

    def test_strict_switched_off_in_configparser(self):
        temp_dir = self.useFixture(fixtures.TempDir()).path
        os.mkdir(os.path.join(temp_dir, 'nested'))
        temp_file = os.path.join(temp_dir, 'test.conf')
        f = open(temp_file, 'w')
        f.write("""[Filters]
privsep: PathFilter, privsep-helper, root
privsep: PathFilter, privsep-helper, root
""")
        f.close()
        filterlist = wrapper.load_filters([temp_dir])
        self.assertIsNotNone(filterlist)


class RootwrapTestCase(testtools.TestCase):
    if os.path.exists('/sbin/ip'):
        _ip = '/sbin/ip'
    else:
        _ip = '/bin/ip'

    def setUp(self):
        super().setUp()
        self.filters = [
            filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
            filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
            filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
            filters.CommandFilter("/nonexistent/cat", "root"),
            filters.CommandFilter("/bin/cat", "root")  # Keep this one last
        ]

    def test_CommandFilter(self):
        f = filters.CommandFilter("sleep", 'root', '10')
        self.assertFalse(f.match(["sleep2"]))

        # verify that any arguments are accepted
        self.assertTrue(f.match(["sleep"]))
        self.assertTrue(f.match(["sleep", "anything"]))
        self.assertTrue(f.match(["sleep", "10"]))
        f = filters.CommandFilter("sleep", 'root')
        self.assertTrue(f.match(["sleep", "10"]))

    def test_empty_commandfilter(self):
        f = filters.CommandFilter("sleep", "root")
        self.assertFalse(f.match([]))
        self.assertFalse(f.match(None))

    def test_empty_regexpfilter(self):
        f = filters.RegExpFilter("sleep", "root", "sleep")
        self.assertFalse(f.match([]))
        self.assertFalse(f.match(None))

    def test_empty_invalid_regexpfilter(self):
        f = filters.RegExpFilter("sleep", "root")
        self.assertFalse(f.match(["anything"]))
        self.assertFalse(f.match([]))

    def test_RegExpFilter_match(self):
        usercmd = ["ls", "/root"]
        filtermatch = wrapper.match_filter(self.filters, usercmd)
        self.assertFalse(filtermatch is None)
        self.assertEqual(["/bin/ls", "/root"],
                         filtermatch.get_command(usercmd))

    def test_RegExpFilter_reject(self):
        usercmd = ["ls", "root"]
        self.assertRaises(wrapper.NoFilterMatched,
                          wrapper.match_filter, self.filters, usercmd)

    def test_missing_command(self):
        valid_but_missing = ["foo_bar_not_exist"]
        invalid = ["foo_bar_not_exist_and_not_matched"]
        self.assertRaises(wrapper.FilterMatchNotExecutable,
                          wrapper.match_filter,
                          self.filters, valid_but_missing)
        self.assertRaises(wrapper.NoFilterMatched,
                          wrapper.match_filter, self.filters, invalid)

    def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
        usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
                   'dnsmasq', 'foo']
        f = filters.EnvFilter("env", "root", config_file_arg + '=A',
                              'NETWORK_ID=', "/usr/bin/dnsmasq")
        self.assertTrue(f.match(usercmd))
        self.assertEqual(['/usr/bin/dnsmasq', 'foo'], f.get_command(usercmd))
        env = f.get_environment(usercmd)
        self.assertEqual('A', env.get(config_file_arg))
        self.assertEqual('foobar', env.get('NETWORK_ID'))

    def test_EnvFilter(self):
        envset = ['A=/some/thing', 'B=somethingelse']
        envcmd = ['env'] + envset
        realcmd = ['sleep', '10']
        usercmd = envcmd + realcmd

        f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
        # accept with leading env
        self.assertTrue(f.match(envcmd + ["sleep"]))
        # accept without leading env
        self.assertTrue(f.match(envset + ["sleep"]))

        # any other command does not match
        self.assertFalse(f.match(envcmd + ["sleep2"]))
        self.assertFalse(f.match(envset + ["sleep2"]))

        # accept any trailing arguments
        self.assertTrue(f.match(usercmd))

        # require given environment variables to match
        self.assertFalse(f.match([envcmd, 'C=ELSE']))
        self.assertFalse(f.match(['env', 'C=xx']))
        self.assertFalse(f.match(['env', 'A=xx']))

        # require env command to be given
        # (otherwise CommandFilters should match
        self.assertFalse(f.match(realcmd))
        # require command to match
        self.assertFalse(f.match(envcmd))
        self.assertFalse(f.match(envcmd[1:]))

        # ensure that the env command is stripped when executing
        self.assertEqual(realcmd, f.exec_args(usercmd))
        env = f.get_environment(usercmd)
        # check that environment variables are set
        self.assertEqual('/some/thing', env.get('A'))
        self.assertEqual('somethingelse', env.get('B'))
        self.assertNotIn('sleep', env.keys())

    def test_EnvFilter_without_leading_env(self):
        envset = ['A=/some/thing', 'B=somethingelse']
        envcmd = ['env'] + envset
        realcmd = ['sleep', '10']

        f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")

        # accept without leading env
        self.assertTrue(f.match(envset + ["sleep"]))

        self.assertEqual(realcmd, f.get_command(envcmd + realcmd))
        self.assertEqual(realcmd, f.get_command(envset + realcmd))

        env = f.get_environment(envset + realcmd)
        # check that environment variables are set
        self.assertEqual('/some/thing', env.get('A'))
        self.assertEqual('somethingelse', env.get('B'))
        self.assertNotIn('sleep', env.keys())

    def test_KillFilter(self):
        if not os.path.exists("/proc/%d" % os.getpid()):
            self.skipTest("Test requires /proc filesystem (procfs)")
        p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        try:
            f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
            f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
            f3 = filters.KillFilter("root", "/usr/bin/coreutils", "-9", "-HUP")
            usercmd = ['kill', '-ALRM', p.pid]
            # Incorrect signal should fail
            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
            usercmd = ['kill', p.pid]
            # Providing no signal should fail
            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
            # Providing matching signal should be allowed
            usercmd = ['kill', '-9', p.pid]
            self.assertTrue(f.match(usercmd) or
                            f2.match(usercmd) or
                            f3.match(usercmd))

            f = filters.KillFilter("root", "/bin/cat")
            f2 = filters.KillFilter("root", "/usr/bin/cat")
            f3 = filters.KillFilter("root", "/usr/bin/coreutils")
            usercmd = ['kill', os.getpid()]
            # Our own PID does not match /bin/sleep, so it should fail
            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
            usercmd = ['kill', 999999]
            # Nonexistent PID should fail
            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
            usercmd = ['kill', p.pid]
            # Providing no signal should work
            self.assertTrue(f.match(usercmd) or
                            f2.match(usercmd) or
                            f3.match(usercmd))

            # verify that relative paths are matched against $PATH
            f = filters.KillFilter("root", "cat")
            f2 = filters.KillFilter("root", "coreutils")
            # Our own PID does not match so it should fail
            usercmd = ['kill', os.getpid()]
            self.assertFalse(f.match(usercmd))
            # Filter should find cat in /bin or /usr/bin
            usercmd = ['kill', p.pid]
            self.assertTrue(f.match(usercmd) or f2.match(usercmd))
            # Filter shouldn't be able to find binary in $PATH, so fail
            with fixtures.EnvironmentVariable("PATH", "/foo:/bar"):
                self.assertFalse(f.match(usercmd))
            # ensure that unset $PATH is not causing an exception
            with fixtures.EnvironmentVariable("PATH"):
                self.assertFalse(f.match(usercmd))
        finally:
            # Terminate the "cat" process and wait for it to finish
            p.terminate()
            p.wait()

    def test_KillFilter_no_raise(self):
        """Makes sure ValueError from bug 926412 is gone."""
        f = filters.KillFilter("root", "")
        # Providing anything other than kill should be False
        usercmd = ['notkill', 999999]
        self.assertFalse(f.match(usercmd))
        # Providing something that is not a pid should be False
        usercmd = ['kill', 'notapid']
        self.assertFalse(f.match(usercmd))
        # no arguments should also be fine
        self.assertFalse(f.match([]))
        self.assertFalse(f.match(None))

    def test_KillFilter_deleted_exe(self):
        """Makes sure deleted exe's are killed correctly."""
        command = "/bin/commandddddd"
        f = filters.KillFilter("root", command)
        usercmd = ['kill', 1234]
        # Providing no signal should work
        with mock.patch('os.readlink') as readlink:
            readlink.return_value = command + ' (deleted)'
            with mock.patch('os.path.isfile') as exists:
                def fake_exists(path):
                    return path == command
                exists.side_effect = fake_exists
                self.assertTrue(f.match(usercmd))

    @mock.patch('os.readlink')
    @mock.patch('os.path.isfile')
    def test_KillFilter_upgraded_exe(self, mock_isfile, mock_readlink):
        """Makes sure upgraded exe's are killed correctly."""
        f = filters.KillFilter("root", "/bin/commandddddd")
        command = "/bin/commandddddd"
        usercmd = ['kill', 1234]

        def fake_exists(path):
            return path == command

        mock_readlink.return_value = command + '\0\05190bfb2 (deleted)'
        mock_isfile.side_effect = fake_exists
        self.assertTrue(f.match(usercmd))

    @mock.patch('os.readlink')
    @mock.patch('os.path.isfile')
    @mock.patch('os.path.exists')
    @mock.patch('os.access')
    def test_KillFilter_renamed_exe(self, mock_access, mock_exists,
                                    mock_isfile, mock_readlink):
        """Makes sure renamed exe's are killed correctly."""
        command = "/bin/commandddddd"
        f = filters.KillFilter("root", command)
        usercmd = ['kill', 1234]

        def fake_os_func(path, *args):
            return path == command

        mock_readlink.return_value = command + ';90bfb2 (deleted)'
        m = mock.mock_open(read_data=command)
        with mock.patch("builtins.open", m, create=True):
            mock_isfile.side_effect = fake_os_func
            mock_exists.side_effect = fake_os_func
            mock_access.side_effect = fake_os_func
            self.assertTrue(f.match(usercmd))

    def test_ReadFileFilter(self):
        goodfn = '/good/file.name'
        f = filters.ReadFileFilter(goodfn)
        usercmd = ['cat', '/bad/file']
        self.assertFalse(f.match(['cat', '/bad/file']))
        usercmd = ['cat', goodfn]
        self.assertEqual(['/bin/cat', goodfn], f.get_command(usercmd))
        self.assertTrue(f.match(usercmd))

    def test_IpFilter_non_netns(self):
        f = filters.IpFilter(self._ip, 'root')
        self.assertTrue(f.match(['ip', 'link', 'list']))
        self.assertTrue(f.match(['ip', '-s', 'link', 'list']))
        self.assertTrue(f.match(['ip', '-s', '-v', 'netns', 'add']))
        self.assertTrue(f.match(['ip', 'link', 'set', 'interface',
                                 'netns', 'somens']))

    def test_IpFilter_netns(self):
        f = filters.IpFilter(self._ip, 'root')
        self.assertFalse(f.match(['ip', 'netns', 'exec', 'foo']))
        self.assertFalse(f.match(['ip', 'netns', 'exec']))
        self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
        self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
        self.assertFalse(f.match(['ip', 'net', 'exec', 'foo']))
        self.assertFalse(f.match(['ip', 'netns', 'e', 'foo']))

    def _test_IpFilter_netns_helper(self, action):
        f = filters.IpFilter(self._ip, 'root')
        self.assertTrue(f.match(['ip', 'link', action]))

    def test_IpFilter_netns_add(self):
        self._test_IpFilter_netns_helper('add')

    def test_IpFilter_netns_delete(self):
        self._test_IpFilter_netns_helper('delete')

    def test_IpFilter_netns_list(self):
        self._test_IpFilter_netns_helper('list')

    def test_IpNetnsExecFilter_match(self):
        f = filters.IpNetnsExecFilter(self._ip, 'root')
        self.assertTrue(
            f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
        self.assertTrue(f.match(['ip', 'net', 'exec', 'foo', 'bar']))
        self.assertTrue(f.match(['ip', 'netn', 'e', 'foo', 'bar']))
        self.assertTrue(f.match(['ip', 'net', 'e', 'foo', 'bar']))
        self.assertTrue(f.match(['ip', 'net', 'exe', 'foo', 'bar']))

    def test_IpNetnsExecFilter_nomatch(self):
        f = filters.IpNetnsExecFilter(self._ip, 'root')
        self.assertFalse(f.match(['ip', 'link', 'list']))
        self.assertFalse(f.match(['ip', 'foo', 'bar', 'netns']))
        self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
        self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
        self.assertFalse(f.match(['ip', 'netns exec', 'foo', 'bar', 'baz']))
        self.assertFalse(f.match([]))

        # verify that at least a NS is given
        self.assertFalse(f.match(['ip', 'netns', 'exec']))

    def test_IpNetnsExecFilter_nomatch_nonroot(self):
        f = filters.IpNetnsExecFilter(self._ip, 'user')
        self.assertFalse(
            f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))

    def test_match_filter_recurses_exec_command_filter_matches(self):
        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
                       filters.IpFilter(self._ip, 'root')]
        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']

        self.assertIsNotNone(wrapper.match_filter(filter_list, args))

    def test_match_filter_recurses_exec_command_matches_user(self):
        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
                       filters.IpFilter(self._ip, 'user')]
        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']

        # Currently ip netns exec requires root, so verify that
        # no non-root filter is matched, as that would escalate privileges
        self.assertRaises(wrapper.NoFilterMatched,
                          wrapper.match_filter, filter_list, args)

    def test_match_filter_recurses_exec_command_filter_does_not_match(self):
        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
                       filters.IpFilter(self._ip, 'root')]
        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
                'ip', 'link', 'list']

        self.assertRaises(wrapper.NoFilterMatched,
                          wrapper.match_filter, filter_list, args)

    def test_ChainingRegExpFilter_match(self):
        filter_list = [filters.ChainingRegExpFilter('nice', 'root',
                                                    'nice', r'-?\d+'),
                       filters.CommandFilter('cat', 'root')]
        args = ['nice', '5', 'cat', '/a']
        dirs = ['/bin', '/usr/bin']

        self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))

    def test_ChainingRegExpFilter_not_match(self):
        filter_list = [filters.ChainingRegExpFilter('nice', 'root',
                                                    'nice', r'-?\d+'),
                       filters.CommandFilter('cat', 'root')]
        args_invalid = (['nice', '5', 'ls', '/a'],
                        ['nice', '--5', 'cat', '/a'],
                        ['nice2', '5', 'cat', '/a'],
                        ['nice', 'cat', '/a'],
                        ['nice', '5'])
        dirs = ['/bin', '/usr/bin']

        for args in args_invalid:
            self.assertRaises(wrapper.NoFilterMatched,
                              wrapper.match_filter, filter_list, args, dirs)

    def test_ChainingRegExpFilter_multiple(self):
        filter_list = [filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
                                                    '-c[0-3]'),
                       filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
                                                    '-c[0-3]', '-n[0-7]'),
                       filters.CommandFilter('cat', 'root')]
        # both filters match to ['ionice', '-c2'], but only the second accepts
        args = ['ionice', '-c2', '-n7', 'cat', '/a']
        dirs = ['/bin', '/usr/bin']

        self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))

    def test_ReadFileFilter_empty_args(self):
        goodfn = '/good/file.name'
        f = filters.ReadFileFilter(goodfn)
        self.assertFalse(f.match([]))
        self.assertFalse(f.match(None))

    def test_exec_dirs_search(self):
        # This test supposes you have /bin/cat or /usr/bin/cat locally
        f = filters.CommandFilter("cat", "root")
        usercmd = ['cat', '/f']
        self.assertTrue(f.match(usercmd))
        self.assertTrue(f.get_command(usercmd,
                                      exec_dirs=['/bin', '/usr/bin'])
                        in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))

    def test_skips(self):
        # Check that all filters are skipped and that the last matches
        usercmd = ["cat", "/"]
        filtermatch = wrapper.match_filter(self.filters, usercmd)
        self.assertTrue(filtermatch is self.filters[-1])

    def test_RootwrapConfig(self):
        raw = configparser.RawConfigParser()

        # Empty config should raise configparser.Error
        self.assertRaises(configparser.Error,
                          wrapper.RootwrapConfig, raw)

        # Check default values
        raw.set('DEFAULT', 'filters_path', '/a,/b')
        config = wrapper.RootwrapConfig(raw)
        self.assertEqual(['/a', '/b'], config.filters_path)
        self.assertEqual(os.environ["PATH"].split(':'), config.exec_dirs)

        with fixtures.EnvironmentVariable("PATH"):
            c = wrapper.RootwrapConfig(raw)
            self.assertEqual([], c.exec_dirs)

        self.assertFalse(config.use_syslog)
        self.assertEqual(logging.handlers.SysLogHandler.LOG_SYSLOG,
                         config.syslog_log_facility)
        self.assertEqual(logging.ERROR, config.syslog_log_level)

        # Check general values
        raw.set('DEFAULT', 'exec_dirs', '/a,/x')
        config = wrapper.RootwrapConfig(raw)
        self.assertEqual(['/a', '/x'], config.exec_dirs)

        raw.set('DEFAULT', 'use_syslog', 'oui')
        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
        raw.set('DEFAULT', 'use_syslog', 'true')
        config = wrapper.RootwrapConfig(raw)
        self.assertTrue(config.use_syslog)

        raw.set('DEFAULT', 'syslog_log_facility', 'moo')
        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
        raw.set('DEFAULT', 'syslog_log_facility', 'local0')
        config = wrapper.RootwrapConfig(raw)
        self.assertEqual(logging.handlers.SysLogHandler.LOG_LOCAL0,
                         config.syslog_log_facility)
        raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
        config = wrapper.RootwrapConfig(raw)
        self.assertEqual(logging.handlers.SysLogHandler.LOG_AUTH,
                         config.syslog_log_facility)

        raw.set('DEFAULT', 'syslog_log_level', 'bar')
        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
        raw.set('DEFAULT', 'syslog_log_level', 'INFO')
        config = wrapper.RootwrapConfig(raw)
        self.assertEqual(logging.INFO, config.syslog_log_level)

    def test_getlogin(self):
        with mock.patch('os.getlogin') as os_getlogin:
            os_getlogin.return_value = 'foo'
            self.assertEqual('foo', wrapper._getlogin())

    def test_getlogin_bad(self):
        with mock.patch('os.getenv') as os_getenv:
            with mock.patch('os.getlogin') as os_getlogin:
                os_getenv.side_effect = [None, None, 'bar']
                os_getlogin.side_effect = OSError(
                    '[Errno 22] Invalid argument')
                self.assertEqual('bar', wrapper._getlogin())
                os_getlogin.assert_called_once_with()
                self.assertEqual(3, os_getenv.call_count)


class PathFilterTestCase(testtools.TestCase):
    def setUp(self):
        super().setUp()

        self.tmp_root_dir = tempfile.mkdtemp()
        tmpdir = fixtures.TempDir(self.tmp_root_dir)
        self.useFixture(tmpdir)

        self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)

        gen_name = lambda: str(uuid.uuid4())

        self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
        self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join(self.tmp_root_dir, 'some')
        self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
                                                 'some')
        self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')

        self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
                                                         gen_name())
        os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
                   self.TRAVERSAL_SYMLINK_WITHIN_DIR)

        self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
                                                          gen_name())
        os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
                   self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)

        self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
        os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)

        self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
        os.symlink(os.path.join(self.tmp_root_dir, 'some_file'),
                   self.SYMLINK_OUTSIDE_DIR)

    def test_empty_args(self):
        self.assertFalse(self.f.match([]))
        self.assertFalse(self.f.match(None))

    def test_argument_pass_constraint(self):
        f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')

        args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
        self.assertTrue(f.match(args))

    def test_argument_equality_constraint(self):
        temp_file_path = os.path.join(self.tmp_root_dir, 'spam/eggs')
        f = filters.PathFilter('/bin/chown', 'root', 'nova', temp_file_path)

        args = ['chown', 'nova', temp_file_path]
        self.assertTrue(f.match(args))

        args = ['chown', 'quantum', temp_file_path]
        self.assertFalse(f.match(args))

    def test_wrong_arguments_number(self):
        args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
        self.assertFalse(self.f.match(args))

    def test_wrong_exec_command(self):
        args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
        self.assertFalse(self.f.match(args))

    def test_match(self):
        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
        self.assertTrue(self.f.match(args))

    def test_match_traversal(self):
        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
        self.assertTrue(self.f.match(args))

    def test_match_symlink(self):
        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
        self.assertTrue(self.f.match(args))

    def test_match_traversal_symlink(self):
        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
        self.assertTrue(self.f.match(args))

    def test_reject(self):
        args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
        self.assertFalse(self.f.match(args))

    def test_reject_traversal(self):
        args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
        self.assertFalse(self.f.match(args))

    def test_reject_symlink(self):
        args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
        self.assertFalse(self.f.match(args))

    def test_reject_traversal_symlink(self):
        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
        self.assertFalse(self.f.match(args))

    def test_get_command(self):
        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
        expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]

        self.assertEqual(expected, self.f.get_command(args))

    def test_get_command_traversal(self):
        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
        expected = ['/bin/chown', 'nova',
                    os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]

        self.assertEqual(expected, self.f.get_command(args))

    def test_get_command_symlink(self):
        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
        expected = ['/bin/chown', 'nova',
                    os.path.realpath(self.SYMLINK_WITHIN_DIR)]

        self.assertEqual(expected, self.f.get_command(args))

    def test_get_command_traversal_symlink(self):
        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
        expected = ['/bin/chown', 'nova',
                    os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]

        self.assertEqual(expected, self.f.get_command(args))


class RunOneCommandTestCase(testtools.TestCase):
    def _test_returncode_helper(self, returncode, expected):
        with mock.patch.object(wrapper, 'start_subprocess') as mock_start:
            with mock.patch('sys.exit') as mock_exit:
                mock_start.return_value.wait.return_value = returncode
                cmd.run_one_command(None, mock.Mock(), None, None)
        mock_exit.assert_called_once_with(expected)

    def test_positive_returncode(self):
        self._test_returncode_helper(1, 1)

    def test_negative_returncode(self):
        self._test_returncode_helper(-1, 129)


class DaemonCleanupException(Exception):
    pass


class DaemonCleanupTestCase(testtools.TestCase):

    @mock.patch('os.chmod')
    @mock.patch('shutil.rmtree')
    @mock.patch('tempfile.mkdtemp')
    @mock.patch('multiprocessing.managers.BaseManager.get_server',
                side_effect=DaemonCleanupException)
    def test_daemon_no_cleanup_for_uninitialized_server(self, gs, mkd, *args):
        mkd.return_value = '/just_dir/123'
        self.assertRaises(DaemonCleanupException, daemon.daemon_start,
                          config=None, filters=None)
