# coding: utf-8
#
# Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import os
import pickle
import time
from unittest.mock import patch

from watchdog.utils.dirsnapshot import DirectorySnapshot
from watchdog.utils.dirsnapshot import DirectorySnapshotDiff
from watchdog.utils.dirsnapshot import EmptyDirectorySnapshot
from watchdog.utils import platform

from .shell import mkdir, touch, mv, rm


def wait():
    """
    Wait long enough for file/folder mtime to change. This is needed
    to be able to detected modifications.
    """
    if platform.is_darwin() or platform.is_windows():
        # on macOS resolution of stat.mtime is only 1 second
        time.sleep(1.5)
    else:
        time.sleep(0.5)


def test_pickle(p):
    """It should be possible to pickle a snapshot."""
    mkdir(p('dir1'))
    snasphot = DirectorySnapshot(p('dir1'))
    pickle.dumps(snasphot)


def test_move_to(p):
    mkdir(p('dir1'))
    mkdir(p('dir2'))
    touch(p('dir1', 'a'))
    ref = DirectorySnapshot(p('dir2'))
    mv(p('dir1', 'a'), p('dir2', 'b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('dir2')))
    assert diff.files_created == [p('dir2', 'b')]


def test_move_from(p):
    mkdir(p('dir1'))
    mkdir(p('dir2'))
    touch(p('dir1', 'a'))
    ref = DirectorySnapshot(p('dir1'))
    mv(p('dir1', 'a'), p('dir2', 'b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('dir1')))
    assert diff.files_deleted == [p('dir1', 'a')]


def test_move_internal(p):
    mkdir(p('dir1'))
    mkdir(p('dir2'))
    touch(p('dir1', 'a'))
    ref = DirectorySnapshot(p(''))
    mv(p('dir1', 'a'), p('dir2', 'b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
    assert diff.files_moved == [(p('dir1', 'a'), p('dir2', 'b'))]
    assert diff.files_created == []
    assert diff.files_deleted == []


def test_move_replace(p):
    mkdir(p('dir1'))
    mkdir(p('dir2'))
    touch(p('dir1', 'a'))
    touch(p('dir2', 'b'))
    ref = DirectorySnapshot(p(''))
    mv(p('dir1', 'a'), p('dir2', 'b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
    assert diff.files_moved == [(p('dir1', 'a'), p('dir2', 'b'))]
    assert diff.files_deleted == [p('dir2', 'b')]
    assert diff.files_created == []


def test_dir_modify_on_create(p):
    ref = DirectorySnapshot(p(''))
    wait()
    touch(p('a'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
    assert diff.dirs_modified == [p('')]


def test_dir_modify_on_move(p):
    mkdir(p('dir1'))
    mkdir(p('dir2'))
    touch(p('dir1', 'a'))
    ref = DirectorySnapshot(p(''))
    wait()
    mv(p('dir1', 'a'), p('dir2', 'b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
    assert set(diff.dirs_modified) == {p('dir1'), p('dir2')}


def test_detect_modify_for_moved_files(p):
    touch(p('a'))
    ref = DirectorySnapshot(p(''))
    wait()
    touch(p('a'))
    mv(p('a'), p('b'))
    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
    assert diff.files_moved == [(p('a'), p('b'))]
    assert diff.files_modified == [p('a')]


def test_replace_dir_with_file(p):
    # Replace a dir with a file of the same name just before the normal listdir
    # call and ensure it doesn't cause an exception

    def listdir_fcn(path):
        if path == p("root", "dir"):
            rm(path, recursive=True)
            touch(path)
        return os.scandir(path)

    mkdir(p('root'))
    mkdir(p('root', 'dir'))

    # Should NOT raise an OSError (ENOTDIR)
    DirectorySnapshot(p('root'), listdir=listdir_fcn)


def test_permission_error(p):
    # Test that unreadable folders are not raising exceptions
    mkdir(p('a', 'b', 'c'), parents=True)

    ref = DirectorySnapshot(p(''))
    walk_orig = DirectorySnapshot.walk

    def walk(self, root):
        """Generate a permission error on folder "a/b"."""
        # Generate the permission error
        if root.startswith(p('a', 'b')):
            raise OSError(errno.EACCES, os.strerror(errno.EACCES))

        # Mimic the original method
        yield from walk_orig(self, root)

    with patch.object(DirectorySnapshot, "walk", new=walk):
        # Should NOT raise an OSError (EACCES)
        new_snapshot = DirectorySnapshot(p(''))

    diff = DirectorySnapshotDiff(ref, new_snapshot)
    assert repr(diff)

    # Children of a/b/ are no more accessible and so removed in the new snapshot
    assert diff.dirs_deleted == [(p('a', 'b', 'c'))]


def test_ignore_device(p):
    # Create a file and take a snapshot.
    touch(p('file'))
    ref = DirectorySnapshot(p(''))
    wait()

    inode_orig = DirectorySnapshot.inode

    def inode(self, path):
        # This function will always return a different device_id,
        # even for the same file.
        result = inode_orig(self, path)
        inode.times += 1
        return result[0], result[1] + inode.times

    inode.times = 0

    # Set the custom inode function.
    with patch.object(DirectorySnapshot, 'inode', new=inode):
        # If we make the diff of the same directory, since by default the
        # DirectorySnapshotDiff compares the snapshots using the device_id (and it will
        # be different), it thinks that the same file has been deleted and created again.
        snapshot = DirectorySnapshot(p(''))
        diff_with_device = DirectorySnapshotDiff(ref, snapshot)
        assert diff_with_device.files_deleted == [(p('file'))]
        assert diff_with_device.files_created == [(p('file'))]

        # Otherwise, if we choose to ignore the device, the file will not be detected as
        # deleted and re-created.
        snapshot = DirectorySnapshot(p(''))
        diff_without_device = DirectorySnapshotDiff(ref, snapshot, ignore_device=True)
        assert diff_without_device.files_deleted == []
        assert diff_without_device.files_created == []


def test_empty_snapshot(p):
    # Create a file and declare a DirectorySnapshot and a EmptyDirectorySnapshot.
    # When we make the diff, although both objects were declared with the same items on
    # the directory, the file and directories created BEFORE the DirectorySnapshot will
    # be detected as newly created.

    touch(p('a'))
    mkdir(p('b', 'c'), parents=True)
    ref = DirectorySnapshot(p(''))
    empty = EmptyDirectorySnapshot()

    diff = DirectorySnapshotDiff(empty, ref)
    assert diff.files_created == [p('a')]
    assert sorted(diff.dirs_created) == sorted([p(''), p('b'), p('b', 'c')])
