File: test_farm_hierarchy_async.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (97 lines) | stat: -rw-r--r-- 3,343 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# coding=utf-8
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import pytest
from datetime import datetime
from dateutil.parser import parse
from azure.core.exceptions import ResourceNotFoundError
from devtools_testutils.aio import recorded_by_proxy_async
from devtools_testutils import set_custom_default_matcher
from testcase_async import FarmBeatsAsyncTestCase
from testcase import FarmBeatsPowerShellPreparer


class TestFarmHierarchyAsync(FarmBeatsAsyncTestCase):    
    @FarmBeatsPowerShellPreparer()
    @recorded_by_proxy_async
    async def test_party_operations(self, **kwargs):
        set_custom_default_matcher(ignored_headers="Accept-Encoding")
        agrifood_endpoint = kwargs.pop("agrifood_endpoint")
        
        # Setup data
        party_id = "test-party-25486"
        party_request = {
            "name": "Test Party",
            "description": "Party created during testing.",
            "status": "Sample Status",
            "properties": {
                "foo": "bar",
                "numeric one": 1,
                1: "numeric key"
            }
        }

        # Setup client
        client = self.create_client(agrifood_endpoint=agrifood_endpoint)

        # Create
        party_response = await client.parties.create_or_update(
            party_id=party_id,
            party=party_request
        )

        # Assert on immediate response
        assert party_response["id"] == party_id
        assert party_response["name"] == party_response["name"]
        assert party_response["description"] == party_response["description"]
        assert party_response["status"] == party_response["status"]

        assert len(party_response["properties"]) == 3
        assert party_response["properties"]["foo"] == "bar"
        assert party_response["properties"]["numeric one"] == 1
        assert party_response["properties"]["1"] == "numeric key"

        assert party_response["eTag"]
        assert type(parse(party_response["createdDateTime"])) is datetime
        assert type(parse(party_response["modifiedDateTime"])) is datetime

        # Retrieve created object
        retrieved_party = await client.parties.get(
            party_id=party_id)

        # Assert on retrieved object
        assert retrieved_party["id"] == party_id

        # Setup data for update
        party_request["name"] += " Updated"

        # Update
        updated_party = await client.parties.create_or_update(
            party_id=party_id,
            party=party_request
        )

        # Assert on immediate response
        # Assert on immediate response
        assert updated_party["name"] == party_request["name"]
        assert updated_party["createdDateTime"] == party_response["createdDateTime"]

        # Retrieve updated object
        retrieved_party = await client.parties.get(
            party_id=party_id)

        # Assert updated object
        assert retrieved_party == updated_party

        # Delete
        await client.parties.delete(
            party_id=party_id)

        # Assert object doesn't exist anymore
        with pytest.raises(ResourceNotFoundError):
            await client.parties.get(
                party_id=party_id)
        
        await self.close_client()