File: client_init.py

package info (click to toggle)
python-irodsclient 3.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,352 kB
  • sloc: python: 16,650; xml: 525; sh: 104; awk: 5; sql: 3; makefile: 3
file content (107 lines) | stat: -rwxr-xr-x 3,666 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3

import contextlib
import getpass
import os
import sys
import textwrap

from irods import env_filename_from_keyword_args, derived_auth_filename
import irods.client_configuration as cfg
import irods.password_obfuscation as obf
import irods.helpers as h


@contextlib.contextmanager
def _open_file_for_protected_contents(file_path, *arg, **kw):
    f = old_mask = None
    try:
        old_mask = os.umask(0o77)
        f = open(file_path, *arg, **kw)
        yield f
    finally:
        if old_mask is not None:
            os.umask(old_mask)
        if f is not None:
            f.close()


class irodsA_already_exists(Exception):
    pass


def _write_encoded_auth_value(auth_file, encode_input, overwrite):
    if not auth_file:
        raise RuntimeError(f"Path to irodsA ({auth_file}) is null.")
    if not overwrite and os.path.exists(auth_file):
        raise irodsA_already_exists(
            f"Overwriting not enabled and {auth_file} already exists."
        )
    with _open_file_for_protected_contents(auth_file, "w") as irodsA:
        irodsA.write(obf.encode(encode_input))


def write_native_credentials_to_secrets_file(password, overwrite=True, **kw):
    """Write the credentials to an .irodsA file that will enable logging in with native authentication
    using the given cleartext password.

    If overwrite is False, irodsA_already_exists will be raised if an .irodsA is found at the
    expected path.
    """
    env_file = env_filename_from_keyword_args(kw)
    auth_file = derived_auth_filename(env_file)
    _write_encoded_auth_value(auth_file, password, overwrite)


def write_pam_credentials_to_secrets_file(password, overwrite=True, **kw):
    """Write the credentials to an .irodsA file that will enable logging in with PAM authentication
    using the given cleartext password.

    If overwrite is False, irodsA_already_exists will be raised if an .irodsA is found at the
    expected path.
    """
    s = h.make_session()
    s.pool.account.password = password
    to_encode = []
    with cfg.loadlines(
        [
            dict(setting="legacy_auth.pam.password_for_auto_renew", value=None),
            dict(setting="legacy_auth.pam.store_password_to_environment", value=False),
        ]
    ):
        to_encode = s.pam_pw_negotiated
    if not to_encode:
        raise RuntimeError(f"Password token was not passed from server.")
    auth_file = s.pool.account.derived_auth_file
    _write_encoded_auth_value(auth_file, to_encode[0], overwrite)


if __name__ == "__main__":
    extra_help = textwrap.dedent(
        """
    This Python module also functions as a script to produce a "secrets" (i.e. encoded password) file.
    Similar to iinit in this capacity, if the environment - and where applicable, the PAM
    configuration for both system and user - is already set up in every other regard, this program
    will generate the secrets file with appropriate permissions and in the normal location, usually:

       ~/.irods/.irodsA

    The user will be interactively prompted to enter their cleartext password.
    """
    )

    vector = {
        "pam_password": write_pam_credentials_to_secrets_file,
        "native": write_native_credentials_to_secrets_file,
    }

    if len(sys.argv) != 2:
        print("{}\nUsage: {} AUTH_SCHEME".format(extra_help, sys.argv[0]))
        print("  AUTH_SCHEME:")
        for x in vector:
            print("    {}".format(x))
        sys.exit(1)
    elif sys.argv[1] in vector:
        vector[sys.argv[1]](getpass.getpass(prompt=f"{sys.argv[1]} password: "))
    else:
        print("did not recognize authentication scheme argument", file=sys.stderr)