File: utils.py

package info (click to toggle)
python-model-bakery 1.20.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 532 kB
  • sloc: python: 4,298; sh: 149; makefile: 21
file content (128 lines) | stat: -rw-r--r-- 4,635 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import datetime
import importlib
import inspect
import itertools
import warnings
from types import ModuleType
from typing import Any, Callable, Optional, Union

from django.apps import apps

from .timezone import tz_aware

__all__ = ["import_from_str", "get_calling_module", "seq"]


def import_from_str(import_string: Optional[Union[Callable, str]]) -> Any:
    """Import an object defined as import if it is an string.

    If `import_string` follows the format `path.to.module.object_name`,
    this method imports it; else it just return the object.
    """
    if isinstance(import_string, str):
        path, field_name = import_string.rsplit(".", 1)

        if apps:
            model = apps.all_models.get(path, {}).get(field_name.lower())
            if model:
                return model

        module = importlib.import_module(path)
        return getattr(module, field_name)

    return import_string


def get_calling_module(levels_back: int) -> Optional[ModuleType]:
    """Get the module some number of stack frames back from the current one.

    Make sure to account for the number of frames between the "calling" code
    and the one that calls this function.

    Args:
        levels_back (int): Number of stack frames back from the current

    Returns:
        (ModuleType): the module from which the code was called
    """
    frame = inspect.stack()[levels_back + 1][0]
    return inspect.getmodule(frame)


def seq(value, increment_by=1, start=None, suffix=None):
    """Generate a sequence of values based on a running count.

    This function can be used to generate sequences of `int`, `float`,
    `datetime`, `date`, `time`, or `str`: whatever the `type` is of the
    provided `value`.

    Args:
        value (object): the value at which to begin generation (this will
            be ignored for types `datetime`, `date`, and `time`)
        increment_by (`int` or `float`, optional): the amount by which to
            increment for each generated value (defaults to `1`)
        start (`int` or `float`, optional): the value at which the sequence
            will begin to add to `value` (if `value` is a `str`, `start` will
            be appended to it)
        suffix (`str`, optional): for `str` `value` sequences, this will be
            appended to the end of each generated value (after the counting
            value is appended)

    Returns:
        object: generated values for sequential data
    """
    _validate_sequence_parameters(value, increment_by, start, suffix)

    if isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
        if type(value) is datetime.date:
            date = datetime.datetime.combine(value, datetime.datetime.now().time())
        elif type(value) is datetime.time:
            date = datetime.datetime.combine(datetime.date.today(), value)
        else:
            date = value

        # convert to epoch time
        epoch_datetime = datetime.datetime(1970, 1, 1, tzinfo=date.tzinfo)
        start = (date - epoch_datetime).total_seconds()
        increment_by = increment_by.total_seconds()
        for n in itertools.count(increment_by, increment_by):
            series_date = tz_aware(
                datetime.datetime.fromtimestamp(start + n, tz=datetime.timezone.utc)
            )

            if type(value) is datetime.time:
                yield series_date.time()
            elif type(value) is datetime.date:
                yield series_date.date()
            else:
                yield series_date
    else:
        for n in itertools.count(
            increment_by if start is None else start, increment_by
        ):
            if suffix:
                yield value + str(n) + suffix
            else:
                yield value + type(value)(n)


def _validate_sequence_parameters(value, increment_by, start, suffix) -> None:
    if suffix:
        if not isinstance(suffix, str):
            raise TypeError("Sequences suffix can only be a string")

        if not isinstance(value, str):
            raise TypeError("Sequences with suffix can only be used with text values")

    if isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
        if not isinstance(increment_by, datetime.timedelta):
            raise TypeError(
                "Sequences with values datetime.datetime, datetime.date and datetime.time, "
                "incremente_by must be a datetime.timedelta."
            )

        if start:
            warnings.warn(
                "start parameter is ignored when using seq with date, time or datetime objects",
                stacklevel=1,
            )