File: init_serverless_sdk.py

package info (click to toggle)
sentry-python 2.18.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,004 kB
  • sloc: python: 55,908; makefile: 114; sh: 111; xml: 2
file content (80 lines) | stat: -rw-r--r-- 2,765 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
For manual instrumentation,
The Handler function string of an aws lambda function should be added as an
environment variable with a key of 'SENTRY_INITIAL_HANDLER' along with the 'DSN'
Then the Handler function sstring should be replaced with
'sentry_sdk.integrations.init_serverless_sdk.sentry_lambda_handler'
"""

import os
import sys
import re

import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any


# Configure Sentry SDK
sentry_sdk.init(
    dsn=os.environ["SENTRY_DSN"],
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=float(os.environ["SENTRY_TRACES_SAMPLE_RATE"]),
)


class AWSLambdaModuleLoader:
    DIR_PATH_REGEX = r"^(.+)\/([^\/]+)$"

    def __init__(self, sentry_initial_handler):
        try:
            module_path, self.handler_name = sentry_initial_handler.rsplit(".", 1)
        except ValueError:
            raise ValueError("Incorrect AWS Handler path (Not a path)")

        self.extract_and_load_lambda_function_module(module_path)

    def extract_and_load_lambda_function_module(self, module_path):
        """
        Method that extracts and loads lambda function module from module_path
        """
        py_version = sys.version_info

        if re.match(self.DIR_PATH_REGEX, module_path):
            # With a path like -> `scheduler/scheduler/event`
            # `module_name` is `event`, and `module_file_path` is `scheduler/scheduler/event.py`
            module_name = module_path.split(os.path.sep)[-1]
            module_file_path = module_path + ".py"

            # Supported python versions are 3.6, 3.7, 3.8
            if py_version >= (3, 6):
                import importlib.util

                spec = importlib.util.spec_from_file_location(
                    module_name, module_file_path
                )
                self.lambda_function_module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(self.lambda_function_module)
            else:
                raise ValueError("Python version %s is not supported." % py_version)
        else:
            import importlib

            self.lambda_function_module = importlib.import_module(module_path)

    def get_lambda_handler(self):
        return getattr(self.lambda_function_module, self.handler_name)


def sentry_lambda_handler(event, context):
    # type: (Any, Any) -> None
    """
    Handler function that invokes a lambda handler which path is defined in
    environment variables as "SENTRY_INITIAL_HANDLER"
    """
    module_loader = AWSLambdaModuleLoader(os.environ["SENTRY_INITIAL_HANDLER"])
    return module_loader.get_lambda_handler()(event, context)