# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
test_object
----------------------------------

Functional tests for object methods.
"""

import random
import string
import tempfile

from testtools import content

from openstack.tests.functional import base


class TestObject(base.BaseFunctionalTest):
    def setUp(self):
        super().setUp()
        if not self.user_cloud.has_service('object-store'):
            self.skipTest('Object service not supported by cloud')

    def test_create_object(self):
        '''Test uploading small and large files.'''
        container_name = self.getUniqueString('container')
        self.addDetail('container', content.text_content(container_name))
        self.addCleanup(self.user_cloud.delete_container, container_name)
        self.user_cloud.create_container(container_name)
        container = self.user_cloud.get_container(container_name)
        self.assertEqual(container_name, container.name)
        self.assertEqual(
            [], self.user_cloud.list_containers(prefix='somethin')
        )
        sizes = (
            (64 * 1024, 1),  # 64K, one segment
            (64 * 1024, 5),  # 64MB, 5 segments
        )
        for size, nseg in sizes:
            segment_size = int(round(size / nseg))
            with tempfile.NamedTemporaryFile() as fake_file:
                fake_content = ''.join(
                    random.SystemRandom().choice(
                        string.ascii_uppercase + string.digits
                    )
                    for _ in range(size)
                ).encode('latin-1')

                fake_file.write(fake_content)
                fake_file.flush()
                name = f'test-{size}'
                self.addCleanup(
                    self.user_cloud.delete_object, container_name, name
                )
                self.user_cloud.create_object(
                    container_name,
                    name,
                    fake_file.name,
                    segment_size=segment_size,
                    metadata={'foo': 'bar'},
                )
                self.assertFalse(
                    self.user_cloud.is_object_stale(
                        container_name, name, fake_file.name
                    )
                )
            self.assertEqual(
                'bar',
                self.user_cloud.get_object_metadata(container_name, name)[
                    'foo'
                ],
            )
            self.user_cloud.update_object(
                container=container_name,
                name=name,
                metadata={'testk': 'testv'},
            )
            self.assertEqual(
                'testv',
                self.user_cloud.get_object_metadata(container_name, name)[
                    'testk'
                ],
            )
            self.assertIsNotNone(
                self.user_cloud.get_object(container_name, name)
            )
            self.assertEqual(
                name, self.user_cloud.list_objects(container_name)[0]['name']
            )
            self.assertEqual(
                [], self.user_cloud.list_objects(container_name, prefix='abc')
            )
            self.assertTrue(
                self.user_cloud.delete_object(container_name, name)
            )
        self.assertEqual([], self.user_cloud.list_objects(container_name))
        self.assertEqual(
            container_name, self.user_cloud.get_container(container_name).name
        )
        self.user_cloud.delete_container(container_name)

    def test_download_object_to_file(self):
        '''Test uploading small and large files.'''
        container_name = self.getUniqueString('container')
        self.addDetail('container', content.text_content(container_name))
        self.addCleanup(self.user_cloud.delete_container, container_name)
        self.user_cloud.create_container(container_name)
        self.assertEqual(
            container_name, self.user_cloud.list_containers()[0]['name']
        )
        sizes = (
            (64 * 1024, 1),  # 64K, one segment
            (64 * 1024, 5),  # 64MB, 5 segments
        )
        for size, nseg in sizes:
            fake_content = b''
            segment_size = int(round(size / nseg))
            with tempfile.NamedTemporaryFile() as fake_file:
                fake_content = ''.join(
                    random.SystemRandom().choice(
                        string.ascii_uppercase + string.digits
                    )
                    for _ in range(size)
                ).encode('latin-1')

                fake_file.write(fake_content)
                fake_file.flush()
                name = f'test-{size}'
                self.addCleanup(
                    self.user_cloud.delete_object, container_name, name
                )
                self.user_cloud.create_object(
                    container_name,
                    name,
                    fake_file.name,
                    segment_size=segment_size,
                    metadata={'foo': 'bar'},
                )
                self.assertFalse(
                    self.user_cloud.is_object_stale(
                        container_name, name, fake_file.name
                    )
                )
            self.assertEqual(
                'bar',
                self.user_cloud.get_object_metadata(container_name, name)[
                    'foo'
                ],
            )
            self.user_cloud.update_object(
                container=container_name,
                name=name,
                metadata={'testk': 'testv'},
            )
            self.assertEqual(
                'testv',
                self.user_cloud.get_object_metadata(container_name, name)[
                    'testk'
                ],
            )

            with tempfile.NamedTemporaryFile() as fake_file:
                self.user_cloud.get_object(
                    container_name, name, outfile=fake_file.name
                )
                downloaded_content = open(fake_file.name, 'rb').read()
                self.assertEqual(fake_content, downloaded_content)

            self.assertEqual(
                name, self.user_cloud.list_objects(container_name)[0]['name']
            )
            self.assertTrue(
                self.user_cloud.delete_object(container_name, name)
            )
        self.assertEqual([], self.user_cloud.list_objects(container_name))
        self.assertEqual(
            container_name, self.user_cloud.list_containers()[0]['name']
        )
        self.user_cloud.delete_container(container_name)
