File: test_customisation.py

package info (click to toggle)
eumdac 3.0.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 716 kB
  • sloc: python: 7,325; makefile: 6
file content (170 lines) | stat: -rw-r--r-- 7,003 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import fnmatch
import io
import time
import shutil
import unittest
from datetime import datetime

from pytest import skip

from eumdac.token import AccessToken
from eumdac.datastore import DataStore
from eumdac.tailor_models import Chain
from eumdac.datatailor import DataTailor
from eumdac.customisation import Customisation, AlreadyDeletedCustomisationError

from .base import DataServiceTestCase, INTEGRATION_TESTING


class TestCustomisation(DataServiceTestCase):
    def setUp(self):
        super().setUp()
        self.token = AccessToken(self.credentials)
        self.datatailor = DataTailor(self.token)

    @unittest.skip("Temporarily skipping")
    def test_full_customisation_process(self):
        # format conversion
        chain_config = Chain(product="HRSEVIRI", format="netcdf4", quicklook="hrseviri_png")
        # search for the last product in June 2020
        datastore = DataStore(self.token)
        hrseviri = datastore.get_collection("EO:EUM:DAT:MSG:HRSEVIRI")
        product = hrseviri.search(dtend=datetime(2020, 7, 1)).first()
        # use context manager, to automatically delete customisations at the end even
        # if an error occurs
        with self.datatailor.new_customisation(product, chain_config) as customisation:
            if not INTEGRATION_TESTING:
                customisation.update_margin = 0
            timeout = 900  # needed to increase to 12 minutes, overstrained DT...
            tic = time.time()
            while time.time() - tic < timeout:
                if INTEGRATION_TESTING:
                    time.sleep(5)
                if customisation.status == "DONE":
                    break
            else:
                raise TimeoutError(f"Cutomisation took longer than {timeout}s")
            # test streaming
            (png_aux_xml,) = fnmatch.filter(customisation.outputs, "*.png.aux.xml")
            with customisation.stream_output(png_aux_xml) as stream:
                file = io.BytesIO()
                shutil.copyfileobj(stream, file)
            # check properties
            self.assertIsInstance(customisation.creation_time, datetime)
            self.assertIsInstance(customisation.backend, str)
            self.assertIsInstance(customisation.product_type, str)
            self.assertIsInstance(customisation.processing_steps, list)
            self.assertIsInstance(customisation.status, str)
            self.assertIsInstance(customisation.progress, int)
            self.assertIsInstance(customisation.duration, int)
            self.assertIsInstance(customisation.outputs, list)
            self.assertIsInstance(customisation.logfile, str)

        expected_xml_length = 870
        self.assertEqual(file.tell(), expected_xml_length)

        # check that customisation has been removed
        delete_url = self.datatailor.urls.get("tailor", "delete")
        self.requests_mock.assert_call_count(delete_url, 1)

    @unittest.skipIf(INTEGRATION_TESTING, "Integration already covered!")
    def test_properties_cache(self):
        customisation_id = "abcdef123"
        url = self.datatailor.urls.get(
            "tailor", "customisation", vars={"customisation_id": customisation_id}
        )
        self.requests_mock.add(
            "GET",
            url,
            json={
                customisation_id: {
                    "creation_time": datetime.now().strftime(Customisation._creation_time_format),
                    "backend_id": "backend-id",
                    "product_id": "product-id",
                    "required_processing_steps": ["a", "b", "c"],
                    "status": "DONE",
                    "progress": 100,
                    "processing_duration": 123,
                    "output_products": ["file.nc", "file.xml", "file.png"],
                }
            },
        )
        self.token._expiration = time.time() + 1000
        self.token._access_token = "token"
        customisation = Customisation(customisation_id, self.datatailor)

        creation_time = customisation.creation_time
        backend = customisation.backend
        self.assertIsInstance(creation_time, datetime)
        self.assertIsInstance(backend, str)
        self.requests_mock.assert_call_count(url, 1)

        # force reload
        customisation.update_margin = 0
        status = customisation.status
        self.assertIsInstance(status, str)
        self.requests_mock.assert_call_count(url, 2)

    @unittest.skipIf(INTEGRATION_TESTING, "Integration already covered!")
    def test_string_representation(self):
        customisation_id = "abcdef123"
        customisation = Customisation(customisation_id, self.datatailor)
        self.assertEqual(customisation_id, str(customisation))
        self.assertIn(customisation_id, repr(customisation))

    @unittest.skipIf(INTEGRATION_TESTING, "Test Failure")
    def test_cannot_fails_when_deleted(self):
        self.requests_mock.add("PATCH", self.datatailor.urls.get("tailor", "delete"))
        self.token._expiration = time.time() + 1000
        self.token._access_token = "token"
        customisation_id = "abcdef123"
        customisation = Customisation(customisation_id, self.datatailor)
        customisation.delete()

        message = "Customisation already deleted."
        with self.assertRaisesRegex(AlreadyDeletedCustomisationError, message):
            customisation.status

        with self.assertRaisesRegex(AlreadyDeletedCustomisationError, message):
            customisation.logfile

        with self.assertRaisesRegex(AlreadyDeletedCustomisationError, message):
            with customisation.stream_output("file.dat") as stream:
                data = stream.read()
                del data

    @unittest.skipIf(INTEGRATION_TESTING, "Test Failure")
    def test_fail_on_unknown_stream_output(self):
        customisation_id = "abcdef123"
        url = self.datatailor.urls.get(
            "tailor", "customisation", vars={"customisation_id": customisation_id}
        )
        self.requests_mock.add(
            "GET",
            url,
            json={customisation_id: {"output_products": ["file.nc", "file.xml", "file.png"]}},
        )
        self.token._expiration = time.time() + 1000
        self.token._access_token = "token"
        customisation_id = "abcdef123"
        customisation = Customisation(customisation_id, self.datatailor)
        message = r"file\.dat not in "
        with self.assertRaisesRegex(ValueError, message):
            with customisation.stream_output("file.dat") as stream:
                data = stream.read()
                del data

    @classmethod
    def prepare_integration_test(cls):
        prepare_test_customisation(cls.credentials)


def prepare_test_customisation(credentials):
    """delete all existing customisations"""
    token = AccessToken(credentials)
    datatailor = DataTailor(token)
    for customisation in datatailor.customisations:
        try:
            customisation.delete()
        except:
            pass