File: core.py

package info (click to toggle)
dotenv-cli 3.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 140 kB
  • sloc: python: 326; makefile: 62
file content (161 lines) | stat: -rw-r--r-- 3,699 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Core functions."""


# remove when we don't support py38 anymore
from __future__ import annotations

import atexit
import logging
import os
from subprocess import Popen
from typing import NoReturn

logger = logging.getLogger(__name__)


def read_dotenv(filename: str) -> dict[str, str]:
    """Read dotenv file.

    Parameters
    ----------
    filename
        path to the filename

    Returns
    -------
    dict

    """
    try:
        with open(filename) as fh:
            data = fh.read()
    except FileNotFoundError:
        logger.warning(
            f"{filename} does not exist, continuing without "
            "setting environment variables."
        )
        data = ""

    res = {}
    for line in data.splitlines():
        logger.debug(line)

        line = line.strip()

        # ignore comments
        if line.startswith("#"):
            continue

        # ignore empty lines or lines w/o '='
        if "=" not in line:
            continue

        key, value = line.split("=", 1)

        # allow export
        if key.startswith("export "):
            key = key.split(" ", 1)[-1]

        key = key.strip()
        value = value.strip()

        # remove quotes (not sure if this is standard behaviour)
        if len(value) >= 2 and value[0] == value[-1] == '"':
            value = value[1:-1]
            # un-escape escape characters
            control_chars = {
                r'\a': '\a',
                r'\b': '\b',
                r'\f': '\f',
                r'\n': '\n',
                r'\r': '\r',
                r'\t': '\t',
                r'\v': '\v',
                r'\\': '\\',
            }
            for char, repl in control_chars.items():
                value = value.replace(char, repl)

        elif len(value) >= 2 and value[0] == value[-1] == "'":
            value = value[1:-1]

        res[key] = value
    logger.debug(res)
    return res


def run_dotenv(
    filenames: list[str], command: list[str], replace: bool = False
) -> NoReturn | int:
    """Run dotenv.

    This function executes the commands with the environment variables
    parsed from filename.

    Parameters
    ----------
    filenames
        paths to the .env files
    command
        command to execute
    replace_env
        Replace the current environment instead of updating it.

    Returns
    -------
    NoReturn | int
        The exit status code in Windows. In POSIX-compatible systems, the
        function does not return normally.

    """
    # read dotenv files
    dotenv = {}
    for filename in filenames:
        dotenv.update(read_dotenv(filename))

    if replace:
        # replace env
        env = dotenv
    else:
        # update env
        env = os.environ.copy()
        env.update(dotenv)

    # in POSIX, we replace the current process with the command, execvpe does
    # not return
    if os.name == "posix":
        os.execvpe(command[0], command, env)

    # in Windows, we spawn a new process
    # execute
    proc = Popen(
        command,
        # stdin=PIPE,
        # stdout=PIPE,
        # stderr=STDOUT,
        universal_newlines=True,
        bufsize=0,
        shell=False,
        env=env,
    )

    def terminate_proc() -> None:
        """Kill child process.

        All signals should be forwarded to the child processes
        automatically, however child processes are also free to ignore
        some of them. With this we make sure the child processes get
        killed once dotenv exits.

        """
        proc.kill()

    # register
    atexit.register(terminate_proc)

    _, _ = proc.communicate()

    # unregister
    atexit.unregister(terminate_proc)

    return proc.returncode