1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
"""DynamoDB cache backend. For usage details, see :ref:`Backends: DynamoDB <dynamodb>`.
.. automodsumm:: requests_cache.backends.dynamodb
:classes-only:
:nosignatures:
"""
from typing import Iterable, Optional
import boto3
from boto3.dynamodb.types import Binary
from boto3.resources.base import ServiceResource
from botocore.exceptions import ClientError
from requests_cache.backends.base import VT
from .._utils import get_valid_kwargs
from ..serializers import SerializerType, dynamodb_document_serializer
from . import BaseCache, BaseStorage, DictStorage
class DynamoDbCache(BaseCache):
"""DynamoDB cache backend.
By default, responses are only partially serialized into a DynamoDB-compatible document format.
Args:
table_name: DynamoDB table name
connection: :boto3:`DynamoDB Resource <services/dynamodb/service-resource/index.html#service-resource>`
object to use instead of creating a new one
ttl: Use DynamoDB TTL to automatically remove expired items
kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource`
"""
def __init__(
self,
table_name: str = 'http_cache',
*,
ttl: bool = True,
connection: Optional[ServiceResource] = None,
decode_content: bool = True,
serializer: Optional[SerializerType] = None,
**kwargs,
):
super().__init__(cache_name=table_name, **kwargs)
skwargs = {'serializer': serializer, **kwargs} if serializer else kwargs
self.responses = DynamoDbDict(
table_name,
ttl=ttl,
connection=connection,
decode_content=decode_content,
**skwargs,
)
# Redirects will be only stored in memory and not persisted
self.redirects: BaseStorage[str, str] = DictStorage()
class DynamoDbDict(BaseStorage):
"""A dictionary-like interface for DynamoDB table
Args:
table_name: DynamoDB table name
connection: :boto3:`DynamoDB Resource <services/dynamodb/service-resource/index.html#service-resource>`
object to use instead of creating a new one
ttl: Use DynamoDB TTL to automatically remove expired items
kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource`
"""
def __init__(
self,
table_name: str,
ttl: bool = True,
connection: Optional[ServiceResource] = None,
serializer: Optional[SerializerType] = dynamodb_document_serializer,
**kwargs,
):
super().__init__(serializer=serializer, **kwargs)
connection_kwargs = get_valid_kwargs(
boto3.Session.__init__, kwargs, extras=['endpoint_url']
)
self.connection = connection or boto3.resource('dynamodb', **connection_kwargs)
self.table_name = table_name
self.ttl = ttl
self._table = self.connection.Table(self.table_name)
self._create_table()
if ttl:
self._enable_ttl()
def _create_table(self):
"""Create a default table if one does not already exist"""
try:
self.connection.create_table(
AttributeDefinitions=[
{'AttributeName': 'key', 'AttributeType': 'S'},
],
TableName=self.table_name,
KeySchema=[
{'AttributeName': 'key', 'KeyType': 'HASH'},
],
BillingMode='PAY_PER_REQUEST',
)
self._table.wait_until_exists()
# Ignore error if table already exists
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceInUseException':
raise
def _enable_ttl(self):
"""Enable TTL, if not already enabled"""
try:
self.connection.meta.client.update_time_to_live(
TableName=self.table_name,
TimeToLiveSpecification={'AttributeName': 'ttl', 'Enabled': True},
)
# Ignore error if TTL is already enabled
except ClientError as e:
if e.response['Error']['Code'] != 'ValidationException':
raise
def __getitem__(self, key):
result = self._table.get_item(Key={'key': key})
if 'Item' not in result:
raise KeyError
return self.deserialize(key, result['Item']['value'])
def __setitem__(self, key, value):
item = {'key': key, 'value': self.serialize(value)}
# If enabled, set TTL value as a timestamp in unix format
if self.ttl and getattr(value, 'expires_unix', None):
item['ttl'] = value.expires_unix
self._table.put_item(Item=item)
def __delitem__(self, key):
response = self._table.delete_item(Key={'key': key}, ReturnValues='ALL_OLD')
if 'Attributes' not in response:
raise KeyError
def __iter__(self):
# Alias 'key' attribute since it's a reserved keyword
results = self._table.scan(
ProjectionExpression='#k',
ExpressionAttributeNames={'#k': 'key'},
)
for item in results['Items']:
yield item['key']
def __len__(self):
"""Get the number of items in the table.
**Note:** This is an estimate, and is updated every 6 hours. A full table scan will use up
your provisioned throughput, so it's not recommended.
"""
return self._table.item_count
def bulk_delete(self, keys: Iterable[str]):
"""Delete multiple keys from the cache. Does not raise errors for missing keys."""
with self._table.batch_writer() as batch:
for key in keys:
batch.delete_item(Key={'key': key})
def clear(self):
self.bulk_delete((k for k in self))
def deserialize(self, key, value: VT):
"""Handle Binary objects from a custom serializer"""
serialized_value = value.value if isinstance(value, Binary) else value
return super().deserialize(key, serialized_value)
# TODO: Support pagination
def values(self):
for item in self._table.scan()['Items']:
yield self.deserialize(item['key'], item['value'])
|