File: write_to_ledger_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (167 lines) | stat: -rw-r--r-- 7,622 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: write_to_ledger_async.py
DESCRIPTION:
    This sample demonstrates how to write to a Confidential Ledger. In this sample, we write some
    ledger entries and perform common retrieval operations.
USAGE:
    python write_to_ledger_async.py
    Set the environment variables with your own values before running the sample:
    1) CONFIDENTIALLEDGER_ENDPOINT - the endpoint of the Confidential Ledger.
"""

import asyncio
import logging
import os
import sys
import tempfile

from azure.confidentialledger.aio import ConfidentialLedgerClient
from azure.confidentialledger.certificate.aio import (
    ConfidentialLedgerCertificateClient,
)
from azure.core.exceptions import HttpResponseError
from azure.identity.aio import DefaultAzureCredential


logging.basicConfig(level=logging.ERROR)
LOG = logging.getLogger()


async def main():
    # Set the values of the client ID, tenant ID, and client secret of the AAD application as
    # environment variables:
    #   AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, CONFIDENTIALLEDGER_ENDPOINT
    try:
        ledger_endpoint = os.environ["CONFIDENTIALLEDGER_ENDPOINT"]
    except KeyError:
        LOG.error(
            "Missing environment variable 'CONFIDENTIALLEDGER_ENDPOINT' - "
            "please set it before running the example"
        )
        sys.exit(1)

    # Under the current URI format, the ledger id is the first part of the ledger endpoint.
    # i.e. https://<ledger id>.confidential-ledger.azure.com
    ledger_id = ledger_endpoint.replace("https://", "").split(".")[0]

    identity_service_client = ConfidentialLedgerCertificateClient()  # type: ignore[call-arg]
    async with identity_service_client:
        ledger_certificate = await identity_service_client.get_ledger_identity(
            ledger_id
        )

    # The Confidential Ledger's TLS certificate must be written to a file to be used by the
    # ConfidentialLedgerClient. Here, we write it to a temporary file so that is is cleaned up
    # automatically when the program exits.
    with tempfile.TemporaryDirectory() as tempdir:
        ledger_cert_file = os.path.join(tempdir, f"{ledger_id}.pem")
        with open(ledger_cert_file, "w") as outfile:
            outfile.write(ledger_certificate["ledgerTlsCertificate"])

        print(
            f"Ledger certificate has been written to {ledger_cert_file}. "
            "It will be deleted when the script completes."
        )

        # Build a client through AAD
        credential = DefaultAzureCredential()
        ledger_client = ConfidentialLedgerClient(
            ledger_endpoint,
            credential=credential,
            ledger_certificate_path=ledger_cert_file,
        )

        # Using the async objects as a context manager ensures they are properly closed after use.
        async with credential:
            async with ledger_client:
                # Write a ledger entry.
                try:
                    post_entry_result = await ledger_client.create_ledger_entry(
                        {"contents": "Hello world!"}
                    )
                    transaction_id = post_entry_result["transactionId"]
                    print(
                        f"Successfully sent a ledger entry to be written. It will become durable "
                        f"at transaction id {transaction_id}"
                    )
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise

                # For some scenarios, users may want to eventually ensure the written entry is
                # durably committed.
                try:
                    print(
                        f"Waiting for {transaction_id} to become durable. This may be skipped for "
                        "when writing less important entries where client throughput is "
                        "prioritized."
                    )
                    wait_poller = await ledger_client.begin_wait_for_commit(transaction_id)  # type: ignore[attr-defined]
                    await wait_poller.wait()
                    print(
                        f"Ledger entry at transaction id {transaction_id} has been committed "
                        "successfully"
                    )
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise

                # Get the latest ledger entry.
                try:
                    current_ledger_entry = await ledger_client.get_current_ledger_entry()
                    current_ledger_entry = current_ledger_entry["contents"]
                    print(f"The current ledger entry is {current_ledger_entry}")
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise

                # Users may wait for a durable commit when writing a ledger entry though this will
                # reduce client throughput.
                try:
                    print(
                        f"Writing another entry. This time, we'll have the client method wait for "
                        "commit."
                    )
                    post_poller = await ledger_client.begin_create_ledger_entry(  # type: ignore[attr-defined]
                        {"contents": "Hello world again!"}
                    )
                    new_post_result = await post_poller.result()
                    print(
                        "The new ledger entry has been committed successfully at transaction id "
                        f'{new_post_result["transactionId"]}'
                    )
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise

                # Get the latest ledger entry.
                try:
                    current_ledger_entry = await ledger_client.get_current_ledger_entry()
                    current_ledger_entry = current_ledger_entry["contents"]
                    print(f"The current ledger entry is {current_ledger_entry}")
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise

                # Make a query for a prior ledger entry. The service may take some time to load the
                # result, so a poller is provided.
                try:
                    get_entry_poller = await ledger_client.begin_get_ledger_entry(transaction_id)  # type: ignore[attr-defined]
                    get_entry_result = await get_entry_poller.result()
                    print(
                        f'At transaction id {get_entry_result["entry"]["transactionId"]}, the '
                        f'ledger entry contains \'{get_entry_result["entry"]["contents"]}\''
                    )
                except HttpResponseError as e:
                    print("Request failed: {}".format(e.response.json()))  # type: ignore[union-attr]
                    raise


if __name__ == "__main__":
    asyncio.run(main())