# Copyright 2012-2015 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test GridFS with Motor, an asynchronous driver for MongoDB and Tornado."""

import datetime
import sys
import traceback
import unittest
from test import MockRequestHandler
from test.asyncio_tests import AsyncIOTestCase, asyncio_test

from bson.objectid import ObjectId
from gridfs.errors import NoFile
from pymongo.errors import InvalidOperation

from motor import motor_asyncio


class MotorGridFileTest(AsyncIOTestCase):
    async def _reset(self):
        await self.db.drop_collection("fs.files")
        await self.db.drop_collection("fs.chunks")
        await self.db.drop_collection("alt.files")
        await self.db.drop_collection("alt.chunks")

    def tearDown(self):
        self.loop.run_until_complete(self._reset())
        super().tearDown()

    @asyncio_test
    async def test_attributes(self):
        f = motor_asyncio.AsyncIOMotorGridIn(
            self.db.fs, filename="test", foo="bar", content_type="text"
        )

        await f.close()

        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, f._id)
        attr_names = (
            "_id",
            "filename",
            "name",
            "name",
            "content_type",
            "length",
            "chunk_size",
            "upload_date",
            "aliases",
            "metadata",
        )

        for attr_name in attr_names:
            self.assertRaises(InvalidOperation, getattr, g, attr_name)

        await g.open()
        for attr_name in attr_names:
            getattr(g, attr_name)

    @asyncio_test
    async def test_iteration(self):
        fs = motor_asyncio.AsyncIOMotorGridFSBucket(self.db)
        _id = await fs.upload_from_stream("filename", b"foo")
        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, _id)

        # Iteration is prohibited.
        self.assertRaises(TypeError, iter, g)

    @asyncio_test
    async def test_basic(self):
        f = motor_asyncio.AsyncIOMotorGridIn(self.db.fs, filename="test")
        await f.write(b"hello world")
        await f.close()
        self.assertEqual(1, (await self.db.fs.files.count_documents({})))
        self.assertEqual(1, (await self.db.fs.chunks.count_documents({})))

        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, f._id)
        self.assertEqual(b"hello world", (await g.read()))

        f = motor_asyncio.AsyncIOMotorGridIn(self.db.fs, filename="test")
        await f.close()
        self.assertEqual(2, (await self.db.fs.files.count_documents({})))
        self.assertEqual(1, (await self.db.fs.chunks.count_documents({})))

        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, f._id)
        self.assertEqual(b"", (await g.read()))

    @asyncio_test
    async def test_readchunk(self):
        in_data = b"a" * 10
        f = motor_asyncio.AsyncIOMotorGridIn(self.db.fs, chunkSize=3)
        await f.write(in_data)
        await f.close()

        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, f._id)

        # This is starting to look like Lisp.
        self.assertEqual(3, len(await g.readchunk()))

        self.assertEqual(2, len(await g.read(2)))
        self.assertEqual(1, len(await g.readchunk()))

        self.assertEqual(3, len(await g.read(3)))

        self.assertEqual(1, len(await g.readchunk()))

        self.assertEqual(0, len(await g.readchunk()))

    @asyncio_test
    async def test_gridout_open_exc_info(self):
        g = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, "_id that doesn't exist")
        try:
            await g.open()
        except NoFile:
            _, _, tb = sys.exc_info()

            # The call tree should include PyMongo code we ran on a thread.
            formatted = "\n".join(traceback.format_tb(tb))
            self.assertTrue("open" in formatted)

    @asyncio_test
    async def test_alternate_collection(self):
        await self.db.alt.files.delete_many({})
        await self.db.alt.chunks.delete_many({})
        f = motor_asyncio.AsyncIOMotorGridIn(self.db.alt)
        await f.write(b"hello world")
        await f.close()

        self.assertEqual(1, (await self.db.alt.files.count_documents({})))
        self.assertEqual(1, (await self.db.alt.chunks.count_documents({})))

        g = motor_asyncio.AsyncIOMotorGridOut(self.db.alt, f._id)
        self.assertEqual(b"hello world", (await g.read()))

    @asyncio_test
    async def test_grid_in_default_opts(self):
        self.assertRaises(TypeError, motor_asyncio.AsyncIOMotorGridIn, "foo")

        a = motor_asyncio.AsyncIOMotorGridIn(self.db.fs)

        self.assertTrue(isinstance(a._id, ObjectId))
        self.assertRaises(AttributeError, setattr, a, "_id", 5)

        self.assertEqual(None, a.filename)

        # This raises AttributeError because you can't directly set properties
        # in Motor, have to use set()
        def setter():
            a.filename = "my_file"

        self.assertRaises(AttributeError, setter)

        # This method of setting attributes works in Motor
        await a.set("filename", "my_file")
        self.assertEqual("my_file", a.filename)

        self.assertEqual(None, a.content_type)
        await a.set("content_type", "text/html")
        self.assertEqual("text/html", a.content_type)

        self.assertRaises(AttributeError, getattr, a, "length")
        self.assertRaises(AttributeError, setattr, a, "length", 5)

        self.assertEqual(255 * 1024, a.chunk_size)
        self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)

        self.assertRaises(AttributeError, getattr, a, "upload_date")
        self.assertRaises(AttributeError, setattr, a, "upload_date", 5)

        self.assertRaises(AttributeError, getattr, a, "aliases")
        await a.set("aliases", ["foo"])
        self.assertEqual(["foo"], a.aliases)

        self.assertRaises(AttributeError, getattr, a, "metadata")
        await a.set("metadata", {"foo": 1})
        self.assertEqual({"foo": 1}, a.metadata)

        await a.close()

        self.assertTrue(isinstance(a._id, ObjectId))
        self.assertRaises(AttributeError, setattr, a, "_id", 5)

        self.assertEqual("my_file", a.filename)

        self.assertEqual("text/html", a.content_type)

        self.assertEqual(0, a.length)
        self.assertRaises(AttributeError, setattr, a, "length", 5)

        self.assertEqual(255 * 1024, a.chunk_size)
        self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)

        self.assertTrue(isinstance(a.upload_date, datetime.datetime))
        self.assertRaises(AttributeError, setattr, a, "upload_date", 5)

        self.assertEqual(["foo"], a.aliases)

        self.assertEqual({"foo": 1}, a.metadata)

    @asyncio_test
    async def test_grid_in_custom_opts(self):
        self.assertRaises(TypeError, motor_asyncio.AsyncIOMotorGridIn, "foo")
        a = motor_asyncio.AsyncIOMotorGridIn(
            self.db.fs,
            _id=5,
            filename="my_file",
            contentType="text/html",
            chunkSize=1000,
            aliases=["foo"],
            metadata={"foo": 1, "bar": 2},
            bar=3,
            baz="hello",
        )

        self.assertEqual(5, a._id)
        self.assertEqual("my_file", a.filename)
        self.assertEqual("text/html", a.content_type)
        self.assertEqual(1000, a.chunk_size)
        self.assertEqual(["foo"], a.aliases)
        self.assertEqual({"foo": 1, "bar": 2}, a.metadata)
        self.assertEqual(3, a.bar)
        self.assertEqual("hello", a.baz)
        self.assertRaises(AttributeError, getattr, a, "mike")

        b = motor_asyncio.AsyncIOMotorGridIn(
            self.db.fs, content_type="text/html", chunk_size=1000, baz=100
        )

        self.assertEqual("text/html", b.content_type)
        self.assertEqual(1000, b.chunk_size)
        self.assertEqual(100, b.baz)

    @asyncio_test
    async def test_grid_out_default_opts(self):
        self.assertRaises(TypeError, motor_asyncio.AsyncIOMotorGridOut, "foo")
        gout = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, 5)
        with self.assertRaises(NoFile):
            await gout.open()

        a = motor_asyncio.AsyncIOMotorGridIn(self.db.fs)
        await a.close()

        b = await motor_asyncio.AsyncIOMotorGridOut(self.db.fs, a._id).open()

        self.assertEqual(a._id, b._id)
        self.assertEqual(0, b.length)
        self.assertEqual(None, b.content_type)
        self.assertEqual(255 * 1024, b.chunk_size)
        self.assertTrue(isinstance(b.upload_date, datetime.datetime))
        self.assertEqual(None, b.aliases)
        self.assertEqual(None, b.metadata)

    @asyncio_test
    async def test_grid_out_custom_opts(self):
        one = motor_asyncio.AsyncIOMotorGridIn(
            self.db.fs,
            _id=5,
            filename="my_file",
            contentType="text/html",
            chunkSize=1000,
            aliases=["foo"],
            metadata={"foo": 1, "bar": 2},
            bar=3,
            baz="hello",
        )

        await one.write(b"hello world")
        await one.close()

        two = await motor_asyncio.AsyncIOMotorGridOut(self.db.fs, 5).open()

        self.assertEqual(5, two._id)
        self.assertEqual(11, two.length)
        self.assertEqual("text/html", two.content_type)
        self.assertEqual(1000, two.chunk_size)
        self.assertTrue(isinstance(two.upload_date, datetime.datetime))
        self.assertEqual(["foo"], two.aliases)
        self.assertEqual({"foo": 1, "bar": 2}, two.metadata)
        self.assertEqual(3, two.bar)

    @asyncio_test
    async def test_grid_out_file_document(self):
        one = motor_asyncio.AsyncIOMotorGridIn(self.db.fs)
        await one.write(b"foo bar")
        await one.close()

        file_document = await self.db.fs.files.find_one()
        two = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, file_document=file_document)

        self.assertEqual(b"foo bar", (await two.read()))

        file_document = await self.db.fs.files.find_one()
        three = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, 5, file_document)
        self.assertEqual(b"foo bar", (await three.read()))

        gridout = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, file_document={})
        with self.assertRaises(NoFile):
            await gridout.open()

    @asyncio_test
    async def test_write_file_like(self):
        one = motor_asyncio.AsyncIOMotorGridIn(self.db.fs)
        await one.write(b"hello world")
        await one.close()

        two = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, one._id)
        three = motor_asyncio.AsyncIOMotorGridIn(self.db.fs)
        await three.write(two)
        await three.close()

        four = motor_asyncio.AsyncIOMotorGridOut(self.db.fs, three._id)
        self.assertEqual(b"hello world", (await four.read()))

    @asyncio_test
    async def test_set_after_close(self):
        f = motor_asyncio.AsyncIOMotorGridIn(self.db.fs, _id="foo", bar="baz")

        self.assertEqual("foo", f._id)
        self.assertEqual("baz", f.bar)
        self.assertRaises(AttributeError, getattr, f, "baz")
        self.assertRaises(AttributeError, getattr, f, "uploadDate")
        self.assertRaises(AttributeError, setattr, f, "_id", 5)

        f.bar = "foo"
        f.baz = 5

        self.assertEqual("foo", f.bar)
        self.assertEqual(5, f.baz)
        self.assertRaises(AttributeError, getattr, f, "uploadDate")

        await f.close()

        self.assertEqual("foo", f._id)
        self.assertEqual("foo", f.bar)
        self.assertEqual(5, f.baz)
        self.assertTrue(f.uploadDate)

        self.assertRaises(AttributeError, setattr, f, "_id", 5)
        await f.set("bar", "a")
        await f.set("baz", "b")
        self.assertRaises(AttributeError, setattr, f, "upload_date", 5)

        g = await motor_asyncio.AsyncIOMotorGridOut(self.db.fs, f._id).open()
        self.assertEqual("a", g.bar)
        self.assertEqual("b", g.baz)

    @asyncio_test
    async def test_stream_to_handler(self):
        fs = motor_asyncio.AsyncIOMotorGridFSBucket(self.db)

        for content_length in (0, 1, 100, 100 * 1000):
            _id = await fs.upload_from_stream("filename", b"a" * content_length)
            gridout = await fs.open_download_stream(_id)
            handler = MockRequestHandler()
            await gridout.stream_to_handler(handler)
            self.assertEqual(content_length, handler.n_written)
            await fs.delete(_id)


if __name__ == "__main__":
    unittest.main()
