File: __init__.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (71 lines) | stat: -rw-r--r-- 2,272 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
from functools import wraps

import boto3

from moto import mock_aws
from tests.test_dynamodb import dynamodb_aws_verified


def application_autoscaling_aws_verified():
    """
    Function that is verified to work against AWS.
    Can be run against AWS at any time by setting:
      MOTO_TEST_ALLOW_AWS_REQUEST=true

    If this environment variable is not set, the function runs in a `mock_aws` context.

    This decorator will:
      - Register a Scalable Target
      - Run the test
      - Deregister the Scalable Target
    """

    def inner(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            allow_aws_request = (
                os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
            )

            @dynamodb_aws_verified()
            def scalable_target_with_dynamodb_table(table_name):
                namespace = "dynamodb"
                resource_id = f"table/{table_name}"
                scalable_dimension = "dynamodb:table:ReadCapacityUnits"

                client = boto3.client(
                    "application-autoscaling", region_name="us-east-1"
                )
                kwargs["client"] = client
                kwargs["namespace"] = namespace
                kwargs["resource_id"] = resource_id
                kwargs["scalable_dimension"] = scalable_dimension

                client.register_scalable_target(
                    ServiceNamespace=namespace,
                    ResourceId=resource_id,
                    ScalableDimension=scalable_dimension,
                    MinCapacity=1,
                    MaxCapacity=4,
                )
                try:
                    resp = func(*args, **kwargs)
                finally:
                    client.deregister_scalable_target(
                        ServiceNamespace=namespace,
                        ResourceId=resource_id,
                        ScalableDimension=scalable_dimension,
                    )

                return resp

            if allow_aws_request:
                return scalable_target_with_dynamodb_table()
            else:
                with mock_aws():
                    return scalable_target_with_dynamodb_table()

        return wrapper

    return inner