from packaging import version
import warnings

from . import CollectionInvalid
from . import InvalidName
from . import OperationFailure
from .collection import Collection
from .filtering import filter_applies
from mongomock import codec_options as mongomock_codec_options
from mongomock import helpers
from mongomock import read_preferences
from mongomock import store

try:
    from pymongo import ReadPreference
    _READ_PREFERENCE_PRIMARY = ReadPreference.PRIMARY
except ImportError:
    _READ_PREFERENCE_PRIMARY = read_preferences.PRIMARY

try:
    from pymongo.read_concern import ReadConcern
except ImportError:
    from .read_concern import ReadConcern

_LIST_COLLECTION_FILTER_ALLOWED_OPERATORS = frozenset(['$regex', '$eq', '$ne'])


def _verify_list_collection_supported_op(keys):
    if set(keys) - _LIST_COLLECTION_FILTER_ALLOWED_OPERATORS:
        raise NotImplementedError(
            'list collection names filter operator {} is not implemented yet in mongomock '
            'allowed operators are {}'.format(keys, _LIST_COLLECTION_FILTER_ALLOWED_OPERATORS))


class Database:

    def __init__(
        self, client, name, _store, read_preference=None, codec_options=None, read_concern=None
    ):
        self.name = name
        self._client = client
        self._collection_accesses = {}
        self._store = _store or store.DatabaseStore()
        self._read_preference = read_preference or _READ_PREFERENCE_PRIMARY
        mongomock_codec_options.is_supported(codec_options)
        self._codec_options = codec_options or mongomock_codec_options.CodecOptions()
        if read_concern and not isinstance(read_concern, ReadConcern):
            raise TypeError('read_concern must be an instance of pymongo.read_concern.ReadConcern')
        self._read_concern = read_concern or ReadConcern()

    def __getitem__(self, coll_name):
        return self.get_collection(coll_name)

    def __getattr__(self, attr):
        if attr.startswith('_'):
            raise AttributeError(
                "%s has no attribute '%s'. To access the %s collection, use database['%s']." %
                (self.__class__.__name__, attr, attr, attr))
        return self[attr]

    def __repr__(self):
        return f"Database({self._client}, '{self.name}')"

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self._client == other._client and self.name == other.name
        return NotImplemented

    if helpers.PYMONGO_VERSION >= version.parse('3.12'):
        def __hash__(self):
            return hash((self._client, self.name))

    @property
    def client(self):
        return self._client

    @property
    def read_preference(self):
        return self._read_preference

    @property
    def codec_options(self):
        return self._codec_options

    @property
    def read_concern(self):
        return self._read_concern

    def _get_created_collections(self):
        return self._store.list_created_collection_names()

    if helpers.PYMONGO_VERSION < version.parse('4.0'):
        def collection_names(self, include_system_collections=True, session=None):
            warnings.warn('collection_names is deprecated. Use list_collection_names instead.')
            if include_system_collections:
                return list(self._get_created_collections())
            return self.list_collection_names(session=session)

    def list_collections(self, filter=None, session=None, nameOnly=False):
        raise NotImplementedError(
            'list_collections is a valid method of Database but has not been implemented in '
            'mongomock yet.')

    def list_collection_names(self, filter=None, session=None):
        """filter: only name field type with eq,ne or regex operator

        session: not supported
        for supported operator please see _LIST_COLLECTION_FILTER_ALLOWED_OPERATORS
        """
        field_name = 'name'

        if session:
            raise NotImplementedError('Mongomock does not handle sessions yet')

        if filter:
            if not filter.get('name'):
                raise NotImplementedError('list collection {} might be valid but is not '
                                          'implemented yet in mongomock'.format(filter))

            filter = {field_name: {'$eq': filter.get(field_name)}} \
                if isinstance(filter.get(field_name), str) else filter

            _verify_list_collection_supported_op(filter.get(field_name).keys())

            return [
                name for name in list(self._store._collections)
                if filter_applies(filter, {field_name: name}) and not name.startswith('system.')
            ]

        return [
            name for name in self._get_created_collections()
            if not name.startswith('system.')
        ]

    def get_collection(self, name, codec_options=None, read_preference=None,
                       write_concern=None, read_concern=None):
        if read_preference is not None:
            read_preferences.ensure_read_preference_type('read_preference', read_preference)
        mongomock_codec_options.is_supported(codec_options)
        try:
            return self._collection_accesses[name].with_options(
                codec_options=codec_options or self._codec_options,
                read_preference=read_preference or self.read_preference,
                read_concern=read_concern, write_concern=write_concern)
        except KeyError:
            self._ensure_valid_collection_name(name)
            collection = self._collection_accesses[name] = Collection(
                self, name=name, read_concern=read_concern, write_concern=write_concern,
                read_preference=read_preference or self.read_preference,
                codec_options=codec_options or self._codec_options, _db_store=self._store, )
            return collection

    def drop_collection(self, name_or_collection, session=None):
        if session:
            raise NotImplementedError('Mongomock does not handle sessions yet')
        if isinstance(name_or_collection, Collection):
            name_or_collection._store.drop()
        else:
            self._store[name_or_collection].drop()

    def _ensure_valid_collection_name(self, name):
        # These are the same checks that are done in pymongo.
        if not isinstance(name, str):
            raise TypeError('name must be an instance of str')
        if not name or '..' in name:
            raise InvalidName('collection names cannot be empty')
        if name[0] == '.' or name[-1] == '.':
            raise InvalidName("collection names must not start or end with '.'")
        if '$' in name:
            raise InvalidName("collection names must not contain '$'")
        if '\x00' in name:
            raise InvalidName('collection names must not contain the null character')

    def create_collection(self, name, **kwargs):
        self._ensure_valid_collection_name(name)
        if name in self.list_collection_names():
            raise CollectionInvalid('collection %s already exists' % name)

        if kwargs:
            raise NotImplementedError('Special options not supported')

        self._store.create_collection(name)
        return self[name]

    def rename_collection(self, name, new_name, dropTarget=False):
        """Changes the name of an existing collection."""
        self._ensure_valid_collection_name(new_name)

        # Reference for server implementation:
        # https://docs.mongodb.com/manual/reference/command/renameCollection/
        if not self._store[name].is_created:
            raise OperationFailure(
                f'The collection "{name}" does not exist.', 10026)
        if new_name in self._store:
            if dropTarget:
                self.drop_collection(new_name)
            else:
                raise OperationFailure(
                    f'The target collection "{new_name}" already exists',
                    10027)
        self._store.rename(name, new_name)
        return {'ok': 1}

    def dereference(self, dbref, session=None):
        if session:
            raise NotImplementedError('Mongomock does not handle sessions yet')

        if not hasattr(dbref, 'collection') or not hasattr(dbref, 'id'):
            raise TypeError('cannot dereference a %s' % type(dbref))
        if dbref.database is not None and dbref.database != self.name:
            raise ValueError('trying to dereference a DBRef that points to '
                             'another database (%r not %r)' % (dbref.database,
                                                               self.name))
        return self[dbref.collection].find_one({'_id': dbref.id})

    def command(self, command, **unused_kwargs):
        if isinstance(command, str):
            command = {command: 1}
        if 'ping' in command:
            return {'ok': 1.}
        # TODO(pascal): Differentiate NotImplementedError for valid commands
        # and OperationFailure if the command is not valid.
        raise NotImplementedError(
            'command is a valid Database method but is not implemented in Mongomock yet')

    def with_options(
            self, codec_options=None, read_preference=None, write_concern=None, read_concern=None):

        mongomock_codec_options.is_supported(codec_options)

        if write_concern:
            raise NotImplementedError(
                'write_concern is a valid parameter for with_options but is not implemented yet in '
                'mongomock')

        if read_preference is None or read_preference == self._read_preference:
            return self

        return Database(
            self._client, self.name, self._store,
            read_preference=read_preference or self._read_preference,
            codec_options=codec_options or self._codec_options,
            read_concern=read_concern or self._read_concern,
        )
