File: utils.py

package info (click to toggle)
secrets 11.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,744 kB
  • sloc: python: 6,509; xml: 7; makefile: 4
file content (128 lines) | stat: -rw-r--r-- 3,958 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# SPDX-License-Identifier: GPL-3.0-only
from __future__ import annotations

import logging
import secrets
import stat
import typing
from gettext import gettext as _
from pathlib import Path
from typing import Generic, TypeVar

from Cryptodome.Cipher import AES
from Cryptodome.Random import get_random_bytes
from gi.repository import Gio, GLib, Gtk

if typing.TYPE_CHECKING:
    from collections.abc import Callable


ATTRIBUTE_HOST_PATH = "xattr::document-portal.host-path"


def format_time(time: GLib.DateTime | None, hours: bool = True) -> str:
    """Display a UTC DateTime in the local timezone."""
    if not time:
        return ""

    time_format = "%e %b %Y"
    if hours:
        time_format += " %R"  # NOTE This is a U+2002 En Space.

    return time.to_local().format(time_format)


def create_random_data(bytes_buffer):
    return secrets.token_bytes(bytes_buffer)


def generate_keyfile_sync(gfile: Gio.File) -> str:
    """Generate a keyfile.

    The callback returns a GFile as its source object and the keyfile hash as
    its user_data. Returns the hash of the keyfile.
    """
    key = get_random_bytes(32)
    cipher = AES.new(key, AES.MODE_EAX)
    ciphertext, tag = cipher.encrypt_and_digest(create_random_data(96))  # type: ignore
    contents = cipher.nonce + tag + ciphertext  # type: ignore
    keyfile_hash = GLib.compute_checksum_for_data(GLib.ChecksumType.SHA1, contents)

    flags = Gio.FileCreateFlags.REPLACE_DESTINATION | Gio.FileCreateFlags.PRIVATE
    gfile.replace_contents(contents, None, False, flags, None)
    # Sets the file as read-only
    if path := gfile.get_path():
        Path(path).chmod(stat.S_IREAD)

    return keyfile_hash


def generate_keyfile_async(gfile: Gio.File, callback: Gio.AsyncReadyCallback) -> None:
    def generate_keyfile_task(task, obj, _data, _cancellable):
        try:
            keyfile_hash = generate_keyfile_sync(obj)
        except GLib.Error as err:  # pylint: disable=broad-except
            task.return_error(err)
        else:
            task.return_value(keyfile_hash)

    task = Gio.Task.new(gfile, None, callback)
    task.run_in_thread(generate_keyfile_task)


def generate_keyfile_finish(result: Gio.AsyncResult) -> tuple[bool, str]:
    return result.propagate_value()


def compare_passwords(pass1: str | None, pass2: str | None) -> bool:
    if pass1 is not None and pass2 is not None:
        return secrets.compare_digest(bytes(pass1, "utf-8"), bytes(pass2, "utf-8"))

    return pass1 is None and pass2 is None


async def get_host_path(gfile: Gio.File) -> str | None:
    """Tries to get the file host-path, if not set will return the file path."""
    try:
        info = await gfile.query_info_async(
            ATTRIBUTE_HOST_PATH, Gio.FileQueryInfoFlags.NONE, GLib.PRIORITY_DEFAULT
        )
    except GLib.Error:
        logging.exception("Could not query file info")
        return gfile.get_path()

    if host_path := info.get_attribute_string(ATTRIBUTE_HOST_PATH):
        return host_path

    return gfile.get_path()


class KeyFileFilter:
    """Filter out Keyfiles in the file chooser dialog."""

    def __init__(self):
        self.file_filter = Gtk.FileFilter()

        self.file_filter.set_name(_("Keyfile"))
        self.file_filter.add_mime_type("application/octet-stream")
        self.file_filter.add_mime_type("application/x-keepass2")
        self.file_filter.add_mime_type("text/plain")
        self.file_filter.add_mime_type("application/x-iwork-keynote-sffkey")


T = TypeVar("T")


class LazyValue(Generic[T]):
    """A lazy value, i.e. a value which is only computed when it is actually needed."""

    def __init__(self, compute: Callable[[], T]):
        self._value: T | None = None
        self._compute = compute

    @property
    def value(self) -> T:
        """Get the value (and compute it if not done yet)."""
        if self._value is None:
            self._value = self._compute()
        return self._value