File: action_utils.py

package info (click to toggle)
freedombox 26.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 82,976 kB
  • sloc: python: 48,504; javascript: 1,736; xml: 481; makefile: 290; sh: 167; php: 32
file content (840 lines) | stat: -rw-r--r-- 28,410 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Python action utility functions.
"""

import logging
import os
import pathlib
import shutil
import subprocess
import tempfile
from contextlib import contextmanager

import augeas

from . import actions

logger = logging.getLogger(__name__)

UWSGI_ENABLED_PATH = '/etc/uwsgi/apps-enabled/{config_name}.ini'
UWSGI_AVAILABLE_PATH = '/etc/uwsgi/apps-available/{config_name}.ini'

# Flag on disk to indicate if freedombox package was held by
# plinth. This is a backup in case the process is interrupted and hold
# is not released.
apt_hold_flag = pathlib.Path('/var/lib/freedombox/package-held')


def is_systemd_running():
    """Return if we are running under systemd."""
    return os.path.exists('/run/systemd')


def systemd_get_default() -> str:
    """Return the default target that systemd will boot into."""
    process = run(['systemctl', 'get-default'], check=True)
    return process.stdout.decode().strip()


def systemd_set_default(target: str):
    """Set the default target that systemd will boot into."""
    run(['systemctl', 'set-default', target], check=True)


def service_daemon_reload():
    """Reload systemd to ensure that newer unit files are read."""
    run(['systemctl', 'daemon-reload'], check=True)


def service_is_running(servicename):
    """Return whether a service is currently running.

    Does not need to run as root.
    """
    try:
        run(['systemctl', 'status', servicename], check=True)
        return True
    except subprocess.CalledProcessError:
        # If a service is not running we get a status code != 0 and
        # thus a CalledProcessError
        return False


@contextmanager
def service_ensure_running(service_name):
    """Ensure a service is running and return to previous state."""
    starting_state = service_is_running(service_name)
    if not starting_state:
        service_enable(service_name)

    try:
        yield starting_state
    finally:
        if not starting_state:
            service_disable(service_name)


@contextmanager
def service_ensure_stopped(service_name):
    """Ensure a service is stopped and return to previous state."""
    starting_state = service_is_running(service_name)
    if starting_state:
        service_disable(service_name)

    try:
        yield starting_state
    finally:
        if starting_state:
            service_enable(service_name)


def service_is_enabled(service_name, strict_check=False):
    """Check if service is enabled in systemd.

    In some cases, after disabling a service, systemd puts it into a state
    called 'enabled-runtime' and returns a positive response to 'is-enabled'
    query. Until we understand better, a conservative work around is to pass
    strict=True to services effected by this behavior.

    """
    try:
        process = run(['systemctl', 'is-enabled', service_name], check=True)
        if not strict_check:
            return True

        return process.stdout.decode().strip() == 'enabled'
    except subprocess.CalledProcessError:
        return False


def service_enable(service_name: str, check: bool = False):
    """Enable and start a service in systemd."""
    run(['systemctl', 'enable', service_name], check=check)
    service_start(service_name, check=check)


def service_disable(service_name: str, check: bool = False):
    """Disable and stop service in systemd."""
    run(['systemctl', 'disable', service_name], check=check)
    try:
        service_stop(service_name, check=check)
    except subprocess.CalledProcessError:
        pass


def service_mask(service_name: str, check: bool = False):
    """Mask a service"""
    run(['systemctl', 'mask', service_name], check=check)


def service_unmask(service_name: str, check: bool = False):
    """Unmask a service"""
    run(['systemctl', 'unmask', service_name], check=check)


def service_start(service_name: str, check: bool = False):
    """Start a service with systemd."""
    service_action(service_name, 'start', check=check)


def service_stop(service_name: str, check: bool = False):
    """Stop a service with systemd."""
    service_action(service_name, 'stop', check=check)


def service_restart(service_name: str, check: bool = False):
    """Restart a service with systemd."""
    service_action(service_name, 'restart', check=check)


def service_try_restart(service_name: str, check: bool = False):
    """Try to restart a service with systemd."""
    service_action(service_name, 'try-restart', check=check)


def service_reload(service_name: str, check: bool = False):
    """Reload a service with systemd."""
    service_action(service_name, 'reload', check=check)


def service_try_reload_or_restart(service_name: str, check: bool = False):
    """Reload a service if it supports reloading, otherwise restart.

    Do nothing if service is not running.
    """
    service_action(service_name, 'try-reload-or-restart', check=check)


def service_reset_failed(service_name: str, check: bool = False):
    """Reset the 'failed' state of units."""
    service_action(service_name, 'reset-failed', check=check)


def service_get_logs(service_name: str) -> str:
    """Return the last lines of journal entries for a unit."""
    command = [
        'journalctl', '--no-pager', '--lines=200', '--unit', service_name
    ]
    process = run(command, check=False)
    return process.stdout.decode()


def service_show(service_name: str) -> dict[str, str]:
    """Return the status of the service in dictionary format."""
    command = ['systemctl', 'show', service_name]
    process = run(command, check=False)
    status = {}
    for line in process.stdout.decode().splitlines():
        parts = line.partition('=')
        status[parts[0]] = parts[2]

    return status


def service_action(service_name: str, action: str, check: bool = False):
    """Perform the given action on the service_name."""
    run(['systemctl', action, service_name], check=check)


def webserver_is_enabled(name, kind='config'):
    """Return whether a config/module/site is enabled in Apache."""
    if not shutil.which('a2query'):
        return False

    option_map = {'config': '-c', 'site': '-s', 'module': '-m'}
    try:
        # Don't print anything on the terminal
        run(['a2query', option_map[kind], name], check=True)
        return True
    except subprocess.CalledProcessError:
        return False


def webserver_enable(name, kind='config', apply_changes=True):
    """Enable a config/module/site in Apache.

    Restart/reload the webserver if apply_changes is True.  Return
    whether restart('restart'), reload('reload') or no action (None)
    is required.  If changes have been applied, then performed action
    is returned.
    """
    if webserver_is_enabled(name, kind) and kind == 'module':
        return

    command_map = {
        'config': 'a2enconf',
        'site': 'a2ensite',
        'module': 'a2enmod'
    }
    run([command_map[kind], name], check=True)

    action_required = 'restart' if kind == 'module' else 'reload'

    if apply_changes:
        if action_required == 'restart':
            service_restart('apache2')
        else:
            service_reload('apache2')

    return action_required


def webserver_disable(name, kind='config', apply_changes=True):
    """Disable config/module/site in Apache.

    Restart/reload the webserver if apply_changes is True.  Return
    whether restart('restart'), reload('reload') or no action (None)
    is required.  If changes have been applied, then performed action
    is returned.
    """
    if not webserver_is_enabled(name, kind):
        return

    command_map = {
        'config': 'a2disconf',
        'site': 'a2dissite',
        'module': 'a2dismod'
    }
    run([command_map[kind], name], check=True)

    action_required = 'restart' if kind == 'module' else 'reload'

    if apply_changes:
        if action_required == 'restart':
            service_restart('apache2')
        else:
            service_reload('apache2')

    return action_required


class WebserverChange:
    """Context to restart/reload Apache after configuration changes."""

    def __init__(self):
        """Initialize the context object state."""
        self.actions_required = set()

    def __enter__(self):
        """Return the context object so methods could be called on it."""
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Restart or reload the webserver.

        Don't suppress exceptions.  If an exception occurs
        restart/reload the webserver based on enable/disable
        operations done so far.
        """
        if 'restart' in self.actions_required:
            service_restart('apache2')
        elif 'reload' in self.actions_required:
            service_reload('apache2')

    def enable(self, name, kind='config'):
        """Enable a config/module/site in Apache.

        Don't apply the changes until the context is exited.
        """
        action_required = webserver_enable(name, kind, apply_changes=False)
        self.actions_required.add(action_required)

    def disable(self, name, kind='config'):
        """Disable a config/module/site in Apache.

        Don't apply the changes until the context is exited.
        """
        action_required = webserver_disable(name, kind, apply_changes=False)
        self.actions_required.add(action_required)


def uwsgi_is_enabled(config_name):
    """Return whether a uwsgi config is enabled."""
    enabled_path = UWSGI_ENABLED_PATH.format(config_name=config_name)
    return os.path.exists(enabled_path)


def uwsgi_enable(config_name):
    """Enable a uwsgi configuration that runs under uwsgi."""
    if uwsgi_is_enabled(config_name):
        return

    # uwsgi is started/stopped using init script. We don't know if it can
    # handle some configuration already running against newly enabled
    # configuration. So, stop first before enabling new configuration.
    service_stop('uwsgi')

    enabled_path = UWSGI_ENABLED_PATH.format(config_name=config_name)
    available_path = UWSGI_AVAILABLE_PATH.format(config_name=config_name)
    os.symlink(available_path, enabled_path)

    service_enable('uwsgi')
    service_start('uwsgi')


def uwsgi_disable(config_name):
    """Disable a uwsgi configuration that runs under uwsgi."""
    if not uwsgi_is_enabled(config_name):
        return

    # If uwsgi is restarted later, it won't stop the just disabled
    # configuration due to how init scripts are written for uwsgi.
    service_stop('uwsgi')
    enabled_path = UWSGI_ENABLED_PATH.format(config_name=config_name)
    os.unlink(enabled_path)
    service_start('uwsgi')


def get_addresses() -> list[dict[str, str | bool]]:
    """Return a list of IP addresses and hostnames."""
    addresses = get_ip_addresses()

    hostname = get_hostname()
    addresses.append({
        'kind': '4',
        'address': 'localhost',
        'numeric': False,
        'url_address': 'localhost'
    })
    addresses.append({
        'kind': '6',
        'address': 'localhost',
        'numeric': False,
        'url_address': 'localhost'
    })
    addresses.append({
        'kind': '4',
        'address': hostname,
        'numeric': False,
        'url_address': hostname
    })

    # XXX: When a hostname is resolved to IPv6 address, it may likely
    # be link-local address.  Link local IPv6 addresses are valid only
    # for a given link and need to be scoped with interface name such
    # as '%eth0' to work.  Tools such as curl don't seem to handle
    # this correctly.
    # addresses.append({'kind': '6', 'address': hostname, 'numeric': False})

    return addresses


def get_ip_addresses() -> list[dict[str, str | bool]]:
    """Return a list of IP addresses assigned to the system."""
    addresses = []

    output = run(['ip', '-o', 'addr'], check=True).stdout
    for line in output.decode().splitlines():
        parts = line.split()
        address: dict[str, str | bool] = {
            'kind': '4' if parts[2] == 'inet' else '6',
            'address': parts[3].split('/')[0],
            'url_address': parts[3].split('/')[0],
            'numeric': True,
            'scope': parts[5],
            'interface': parts[1]
        }

        if address['kind'] == '6' and address['numeric']:
            if address['scope'] != 'link':
                address['url_address'] = '[{0}]'.format(address['address'])
            else:
                address['url_address'] = '[{0}%{1}]'.format(
                    address['url_address'], address['interface'])

        addresses.append(address)

    return addresses


def get_hostname():
    """Return the current hostname."""
    return run(['hostname'], check=True).stdout.decode().strip()


def dpkg_reconfigure(package, config):
    """Reconfigure package using debconf database override."""
    override_template = '''
Name: {package}/{key}
Template: {package}/{key}
Value: {value}
Owners: {package}
'''
    override_data = ''
    for key, value in config.items():
        override_data += override_template.format(package=package, key=key,
                                                  value=value)

    with tempfile.NamedTemporaryFile(mode='w', delete=False) as override_file:
        override_file.write(override_data)

    env = os.environ.copy()
    env['DEBCONF_DB_OVERRIDE'] = 'File{' + override_file.name + \
                                 ' readonly:true}'
    env['DEBIAN_FRONTEND'] = 'noninteractive'
    run(['dpkg-reconfigure', package], env=env, check=False)

    try:
        os.remove(override_file.name)
    except OSError:
        pass


def debconf_set_selections(presets):
    """Answer debconf questions before installing a package."""
    try:
        # Workaround Debian Bug #487300. In some situations, debconf complains
        # it can't find the question being answered even though it is supposed
        # to create a dummy question for it.
        run(['/usr/share/debconf/fix_db.pl'], check=True)
    except (FileNotFoundError, PermissionError):
        pass

    presets = '\n'.join(presets)
    run(['debconf-set-selections'], input=presets.encode(), check=True)


def is_disk_image():
    """Return whether the current machine is from a disk image.

    Two primary ways to install FreedomBox are:
    - Using FreedomBox image for various hardware platforms.
    - Installing packages on a Debian machine using apt.
    """
    return os.path.exists('/var/lib/freedombox/is-freedombox-disk-image')


def run_apt_command(arguments, enable_triggers: bool = False):
    """Run apt-get with provided arguments."""
    command = ['apt-get', '--assume-yes', '--quiet=2'] + arguments

    env = os.environ.copy()
    env['DEBIAN_FRONTEND'] = 'noninteractive'
    if not enable_triggers:
        env['FREEDOMBOX_INVOKED'] = 'true'
    process = run(command, stdin=subprocess.DEVNULL, env=env, check=False)
    return process.returncode


@contextmanager
def apt_hold(packages):
    """Prevent packages from being removed during apt operations.

    `apt-mark hold PACKAGES` accepts a list of packages. But if one of
    the package is missing from the apt repository, then it will fail
    to hold any of the listed packages. So it is necessary to try to
    hold each package by itself.

    Packages held by this context will be unheld when leaving the
    context. But if a package was already held beforehand, it will be
    ignored (and not unheld).

    """
    held_packages = []
    try:
        for package in packages:
            current_hold = run(['apt-mark', 'showhold', package],
                               check=True).stdout
            if not current_hold:
                process = run(['apt-mark', 'hold', package], check=False)
                if process.returncode == 0:  # success
                    held_packages.append(package)

        yield held_packages
    finally:
        for package in held_packages:
            run(['apt-mark', 'unhold', package], check=True)


@contextmanager
def apt_hold_freedombox():
    """Prevent freedombox package from being removed during apt operations."""
    current_hold = run(['apt-mark', 'showhold', 'freedombox'],
                       check=True).stdout
    try:
        if current_hold:
            # Package is already held, possibly by administrator.
            yield current_hold
        else:
            # Set the flag.
            apt_hold_flag.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
            apt_hold_flag.touch(mode=0o660)
            yield run(['apt-mark', 'hold', 'freedombox'], check=True)
    finally:
        # Was the package held, either in this process or a previous one?
        if not current_hold or apt_hold_flag.exists():
            apt_unhold_freedombox()


def apt_unhold_freedombox():
    """Remove any hold on freedombox package, and clear flag."""
    run(['apt-mark', 'unhold', 'freedombox'], check=False)
    if apt_hold_flag.exists():
        apt_hold_flag.unlink()


def is_package_manager_busy():
    """Return whether package manager is busy.
    This command uses the `lsof` command to check whether the dpkg lock file
    is open which indicates that the package manager is busy"""
    LOCK_FILE = '/var/lib/dpkg/lock'
    try:
        run(['lsof', LOCK_FILE], check=True)
        return True
    except subprocess.CalledProcessError:
        return False


def podman_create(container_name: str, image_name: str, volume_name: str,
                  volume_path: str, volumes: dict[str, str] | None = None,
                  env: dict[str, str] | None = None,
                  binds_to: list[str] | None = None,
                  devices: dict[str, str] | None = None):
    """Remove and recreate a podman container."""
    service_stop(f'{volume_name}-volume.service')
    service_stop(container_name)

    # Data is kept
    run(['podman', 'volume', 'rm', '--force', volume_name], check=False)

    directory = pathlib.Path('/etc/containers/systemd')
    directory.mkdir(parents=True, exist_ok=True)

    # Fetch the image before creating the container. The systemd service for
    # the container won't timeout due to slow internet connectivity.
    run(['podman', 'image', 'pull', image_name], check=True)

    pathlib.Path(volume_path).mkdir(parents=True, exist_ok=True)
    # Create storage volume
    volume_file = directory / f'{volume_name}.volume'
    contents = f'''[Volume]
Device={volume_path}
Driver=local
VolumeName={volume_name}
Options=bind
'''
    volume_file.write_text(contents)

    service_file = directory / f'{container_name}.container'
    volume_lines = '\n'.join([
        f'Volume={source}:{dest}' for source, dest in (volumes or {}).items()
    ])
    env_lines = '\n'.join(
        [f'Environment={key}={value}' for key, value in (env or {}).items()])
    bind_lines = '\n'.join(f'BindsTo={service}\nAfter={service}'
                           for service in (binds_to or []))
    devices_lines = '\n'.join(f'AddDevice={source}:{dest}'
                              for source, dest in (devices or {}).items()
                              if pathlib.Path(source).exists())
    contents = f'''[Unit]
Requires={volume_name}-volume.service
After={volume_name}-volume.service
{bind_lines}

[Container]
AutoUpdate=registry
ContainerName=%N
{env_lines}
Image={image_name}
Network=host
{volume_lines}
{devices_lines}

[Service]
Restart=always

[Install]
WantedBy=default.target
'''
    service_file.write_text(contents)

    # Remove the fallback service file when upgrading from bookworm to trixie.
    # Re-running setup should be sufficient.
    _podman_create_fallback_service_file(container_name, image_name,
                                         volume_name, volume_path, volumes,
                                         env, binds_to, devices)

    service_daemon_reload()


def _podman_create_fallback_service_file(
        container_name: str, image_name: str, volume_name: str,
        volume_path: str, volumes: dict[str, str] | None = None,
        env: dict[str, str] | None = None, binds_to: list[str] | None = None,
        devices: dict[str, str] | None = None):
    """Create a systemd unit file if systemd generator is not available."""
    service_file = pathlib.Path(
        f'/etc/systemd/system/{container_name}.service')

    generator = '/usr/lib/systemd/system-generators/podman-system-generator'
    if pathlib.Path(generator).exists():
        # If systemd generator is present, during an upgrade, remove the
        # .service file (perhaps created when generator is not present).
        service_file.unlink(missing_ok=True)
        return

    service_file.parent.mkdir(parents=True, exist_ok=True)
    bind_lines = '\n'.join(f'BindsTo={service}\nAfter={service}'
                           for service in (binds_to or []))
    require_mounts_for = '\n'.join((f'RequiresMountsFor={host_path}'
                                    for host_path in (volumes or {})
                                    if host_path.startswith('/')))
    env_args = ' '.join(
        (f'--env {key}={value}' for key, value in (env or {}).items()))
    volume_args = ' '.join(
        (f'-v {host_path}:{container_path}'
         for host_path, container_path in (volumes or {}).items()))
    devices_args = ' '.join(
        (f'-d {host_path}:{container_path}'
         for host_path, container_path in (devices or {}).items()
         if pathlib.Path(host_path).exists()))

    # Similar to the file quadlet systemd generator produces but with volume
    # related commands merged.
    contents = f'''[Unit]
{bind_lines}
RequiresMountsFor=%t/containers
{require_mounts_for}

[Service]
Restart=always
Environment=PODMAN_SYSTEMD_UNIT=%n
KillMode=mixed
ExecStop=/usr/bin/podman rm -v -f -i --cidfile=%t/%N.cid
ExecStopPost=-/usr/bin/podman rm -v -f -i --cidfile=%t/%N.cid
Delegate=yes
Type=notify
NotifyAccess=all
SyslogIdentifier=%N
ExecStartPre=/usr/bin/rm -f %t/%N.cid
ExecStartPre=/usr/bin/podman volume rm --force {volume_name}
ExecStartPre=/usr/bin/podman volume create --driver=local --opt device={volume_path} --opt o=bind {volume_name}
ExecStart=/usr/bin/podman run --name=%N --cidfile=%t/%N.cid --replace --rm --cgroups=split --network=host --sdnotify=conmon --detach --label io.containers.autoupdate=registry {volume_args} {env_args} {devices_args} {image_name}

[Install]
WantedBy=default.target
'''  # noqa: E501
    service_file.write_text(contents, encoding='utf-8')
    service_daemon_reload()


def _podman_augeus(container_name: str):
    """Return an augues instance to edit container configuration file."""
    aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
                        augeas.Augeas.NO_MODL_AUTOLOAD)
    container = f'/etc/containers/systemd/{container_name}.container'
    aug.transform('Systemd', container)
    aug.set('/augeas/context', '/files' + container)
    aug.load()
    return aug


def podman_is_enabled(container_name: str) -> bool:
    """Return whether the container to start on boot."""
    aug = _podman_augeus(container_name)
    aug = _podman_augeus(container_name)
    value = 'default.target'
    key = 'Install/WantedBy'
    return any(
        (aug.get(match_ + '/value') == value for match_ in aug.match(key)))


def podman_enable(container_name: str):
    """Enable container to start on boot."""
    aug = _podman_augeus(container_name)
    value = 'default.target'
    key = 'Install/WantedBy'
    found = any(
        (aug.get(match_ + '/value') == value for match_ in aug.match(key)))
    if not found:
        aug.set(f'{key}[last() +1]/value', value)
        aug.save()


def podman_disable(container_name: str):
    """Disable container to start on boot."""
    aug = _podman_augeus(container_name)
    aug.remove('Install/WantedBy')
    aug.save()


def podman_uninstall(container_name: str, volume_name: str, image_name: str,
                     volume_path: str):
    """Remove a podman container's components and systemd unit."""
    run(['podman', 'volume', 'rm', '--force', volume_name], check=True)
    run(['podman', 'image', 'rm', '--ignore', image_name], check=True)
    volume_file = pathlib.Path(
        '/etc/containers/systemd/') / f'{volume_name}.volume'
    volume_file.unlink(missing_ok=True)
    service_file = pathlib.Path(
        '/etc/containers/systemd/') / f'{container_name}.container'
    service_file.unlink(missing_ok=True)
    # Remove fallback service file
    service_file = pathlib.Path(
        '/etc/systemd/system/') / f'{container_name}.service'
    service_file.unlink(missing_ok=True)
    shutil.rmtree(volume_path, ignore_errors=True)
    service_daemon_reload()


def move_uploaded_file(source: str | pathlib.Path,
                       destination_dir: str | pathlib.Path,
                       destination_file_name: str,
                       allow_overwrite: bool = False, user: str = 'root',
                       group: str = 'root', permissions: int = 0o644):
    """Move an uploaded file from Django upload directory to a destination.

    Sets the expected ownership and permissions on the destination file. If
    possible, performs a simple rename operation. Otherwise, copies the file to
    the destination.

    The source must be regular file under the currently configured Django file
    upload directory. It must be a absolute path that can be verified to be
    under Django settings FILE_UPLOAD_TEMP_DIR.

    The destination_dir must be a directory. destination_file_name must be a
    simple file name without any other path components. When concatenated
    together, they specify the full destination path to move the file to.

    If allow_overwrite is set to False and destination file exists, an
    exception is raised.
    """
    from plinth import settings

    if isinstance(source, str):
        source = pathlib.Path(source)

    if isinstance(destination_dir, str):
        destination_dir = pathlib.Path(destination_dir)

    source = source.resolve(strict=True)
    destination_dir = destination_dir.resolve()

    # Verify source file
    if not source.is_file():
        raise ValueError('Source is not a file')

    tmp_dir = pathlib.Path(getattr(settings, 'FILE_UPLOAD_TEMP_DIR', '/tmp'))
    if not source.is_relative_to(tmp_dir):
        raise ValueError('Uploaded file is not in expected temp directory')

    # Verify destination directory
    if not destination_dir.is_dir():
        raise ValueError('Destination is not a directory')

    # Verify destination file name
    if len(pathlib.Path(destination_file_name).parts) != 1:
        raise ValueError('Invalid destination file name')

    destination = destination_dir / destination_file_name

    if destination.exists() and not allow_overwrite:
        raise FileExistsError('Destination already exists')

    # Move or copy
    shutil.move(source, destination)
    shutil.chown(destination, user, group)
    destination.chmod(permissions)


def run_as_user(command, username, **kwargs):
    """Run a command as another user.

    Uses 'runuser' which is similar to 'su'. Creates PAM session unlike
    setpriv. Sets real/effective uid/gid and resets the environment.
    """
    command = ['runuser', '--user', username, '--'] + command
    return run(command, **kwargs)


def run(command, **kwargs):
    """Run subprocess.run but capture stdout and stderr in thread storage."""
    collect_stdout = ('stdout' not in kwargs
                      and 'capture_output' not in kwargs)
    collect_stderr = ('stderr' not in kwargs
                      and 'capture_output' not in kwargs)

    if collect_stdout:
        kwargs['stdout'] = subprocess.PIPE

    if collect_stderr:
        kwargs['stderr'] = subprocess.PIPE

    try:
        process = subprocess.run(command, **kwargs)
        if collect_stdout and hasattr(actions.thread_storage, 'stdout'):
            actions.thread_storage.stdout += process.stdout

        if collect_stderr and hasattr(actions.thread_storage, 'stderr'):
            actions.thread_storage.stderr += process.stderr
    except subprocess.CalledProcessError as exception:
        if exception.stdout and hasattr(actions.thread_storage, 'stdout'):
            actions.thread_storage.stdout += exception.stdout

        if exception.stderr and hasattr(actions.thread_storage, 'stderr'):
            actions.thread_storage.stderr += exception.stderr

        raise exception

    return process