1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import json
import random
import uuid
from typing import Dict, Any
from azure.cosmos import PartitionKey
from azure.cosmos import CosmosClient
import azure.cosmos.exceptions as exceptions
import config
from azure.identity import DefaultAzureCredential
from azure.cosmos.http_constants import HttpHeaders
# ----------------------------------------------------------------------------------------------------------
# Prerequisites -
#
# 1. An Azure Cosmos account -
# https://azure.microsoft.com/documentation/articles/documentdb-create-account/
#
# 2. Microsoft Azure Cosmos PyPi package -
# https://pypi.python.org/pypi/azure-cosmos/
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates how to manage session tokens. By default, the SDK manages session tokens for you. These samples
# are for use cases where you want to manage session tokens yourself.
#
# 1. Storing session tokens in a cache by feed range from the partition key.
#
# 2. Storing session tokens in a cache by feed range from the container.
#
# ----------------------------------------------------------------------------------------------------------
# Note -
#
# Running this sample will create (and delete) multiple Containers on your account.
# Each time a Container is created the account will be billed for 1 hour of usage based on
# the provisioned throughput (RU/s) of that account.
# ----------------------------------------------------------------------------------------------------------
HOST = config.settings['host']
CREDENTIAL = DefaultAzureCredential()
DATABASE_ID = config.settings['database_id']
CONTAINER_ID = config.settings['container_id']
def storing_session_tokens_pk(container):
print('1. Storing session tokens in a cache by feed range from the partition key.')
cache: Dict[str, Any] = {}
# Everything below is just a simulation of what could be run on different machines and clients
# to store session tokens in a cache by feed range from the partition key.
# The cache is a Dict here for simplicity but in a real-world scenario, it would be some service.
feed_ranges_and_session_tokens = []
previous_session_token = ""
# populating cache with session tokens
for i in range(5):
item = {
'id': 'item' + str(uuid.uuid4()),
'name': 'sample',
'pk': 'A' + str(random.randint(1, 10))
}
target_feed_range = container.feed_range_from_partition_key(item['pk'])
response = container.create_item(item, session_token=previous_session_token)
session_token = response.get_response_headers()[HttpHeaders.SessionToken]
# adding everything in the cache in case consolidation is possible
for feed_range_json, session_token_cache in cache.items():
feed_range = json.loads(feed_range_json)
feed_ranges_and_session_tokens.append((feed_range, session_token_cache))
feed_ranges_and_session_tokens.append((target_feed_range, session_token))
latest_session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range)
# only doing this for the key to be immutable
feed_range_json = json.dumps(target_feed_range)
cache[feed_range_json] = latest_session_token
previous_session_token = session_token
def storing_session_tokens_container_feed_ranges(container):
print('2. Storing session tokens in a cache by feed range from the container.')
# The cache is a dictionary here for simplicity but in a real-world scenario, it would be some service.
cache: Dict[str, Any] = {}
# Everything below is just a simulation of what could be run on different machines and clients
# to store session tokens in a cache by feed range from the partition key.
feed_ranges_and_session_tokens = []
previous_session_token = ""
feed_ranges = list(container.read_feed_ranges())
# populating cache with session tokens
for i in range(5):
item = {
'id': 'item' + str(uuid.uuid4()),
'name': 'sample',
'pk': 'A' + str(random.randint(1, 10))
}
feed_range_from_pk = container.feed_range_from_partition_key(item['pk'])
response = container.create_item(item, session_token=previous_session_token)
session_token = response.get_response_headers()[HttpHeaders.SessionToken]
# adding everything in the cache in case consolidation is possible
for feed_range_json, session_token_cache in cache.items():
feed_range = json.loads(feed_range_json)
feed_ranges_and_session_tokens.append((feed_range, session_token_cache))
target_feed_range: dict = next(
(feed_range for feed_range in feed_ranges if container.is_feed_range_subset(feed_range, feed_range_from_pk)),
{}
)
feed_ranges_and_session_tokens.append((target_feed_range, session_token))
latest_session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range)
# only doing this for the key to be immutable
feed_range_json = json.dumps(target_feed_range)
cache[feed_range_json] = latest_session_token
previous_session_token = session_token
def run_sample():
with CosmosClient(HOST, CREDENTIAL) as client:
try:
db = client.create_database_if_not_exists(id=DATABASE_ID)
container = db.create_container_if_not_exists(id=CONTAINER_ID, partition_key=PartitionKey('/pk'))
# example of storing session tokens in cache by feed range from the partition key
storing_session_tokens_pk(container)
# example of storing session tokens in cache by feed range from the container
storing_session_tokens_container_feed_ranges(container)
# cleanup database after sample
try:
client.delete_database(db)
except exceptions.CosmosResourceNotFoundError:
pass
except exceptions.CosmosHttpResponseError as e:
print('\nrun_sample has caught an error. {0}'.format(e.message))
finally:
print("\nrun_sample done")
if __name__ == '__main__':
run_sample()
|