File: library.py

package info (click to toggle)
freedom-maker 0.35
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 408 kB
  • sloc: python: 2,200; xml: 357; makefile: 10
file content (580 lines) | stat: -rw-r--r-- 20,578 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Library of small actions that make up image building.

Most simple wrappers over command line utilities without any understanding of
state of build process.
"""

import contextlib
import logging
import os
import re
import shutil
import subprocess
import tempfile
import threading

from . import releases

logger = logging.getLogger(__name__)


def run(process_args, **kwargs):
    """Run a command."""

    def _output_logger(file_handle, out_buffer, prompt):
        """Read from stdout/stderr file handle and print to stdout."""
        while (True):
            read_bytes = file_handle.readline()
            if not read_bytes:
                break

            out_buffer.append(read_bytes)
            logger.debug(f'{prompt}%s', read_bytes.decode().strip())

        file_handle.close()

    logger.info('Executing command - %s %s', process_args, dict(kwargs))
    stdin_buffer = kwargs.pop('input', None)
    check = kwargs.pop('check', True)
    environ = kwargs['env'] if 'env' in kwargs else os.environ.copy()
    environ['LC_ALL'] = 'C'
    environ['LANGUAGE'] = 'C'
    environ['LANG'] = 'C'
    environ['DEBIAN_FRONTEND'] = 'noninteractive'
    environ['DEBCONF_NONINTERACTIVE_SEEN'] = 'true'
    kwargs['env'] = environ
    kwargs['stdout'] = subprocess.PIPE
    kwargs['stderr'] = subprocess.PIPE
    kwargs['stdin'] = subprocess.PIPE if stdin_buffer else None
    process = subprocess.Popen(process_args, **kwargs)

    stdout_buffer = []
    stdout_thread = threading.Thread(
        target=_output_logger, args=[process.stdout, stdout_buffer, '> '])
    stdout_thread.start()
    stderr_buffer = []
    stderr_thread = threading.Thread(
        target=_output_logger, args=[process.stderr, stderr_buffer, '2> '])
    stderr_thread.start()
    if stdin_buffer:
        process.stdin.write(stdin_buffer)
        process.stdin.close()

    stdout_thread.join()
    stderr_thread.join()
    return_code = process.wait()

    stdout_buffer = b''.join(stdout_buffer) or None
    stderr_buffer = b''.join(stderr_buffer) or None

    if check and return_code:
        logger.error('Command failed: %s\n%s\n%s', ' '.join(process_args),
                     stdout_buffer, stderr_buffer)
        raise subprocess.CalledProcessError(return_code, process_args,
                                            stdout_buffer, stderr_buffer)

    return stdout_buffer


def run_in_chroot(state, *args, **kwargs):
    """Run a command inside chroot of mount point."""
    args = [['chroot', state['mount_point']] + arg for arg in args]
    return run(*args, **kwargs)


def path_in_mount(state, path):
    """Return the path inside the mount point of image.

    If an absolute path is provided as path, it will returned as it is.

    """
    return os.path.join(state['mount_point'], path)


def schedule_cleanup(state, method, *args, **kwargs):
    """Make a note of the cleanup operations to happen."""
    state.setdefault('cleanup', []).append([method, args, kwargs])


def cleanup(state):
    """Run all the scheduled cleanups in reverse order."""
    state.setdefault('cleanup', [])
    for cleanup_step in reversed(state['cleanup']):
        method, args, kwargs = cleanup_step
        method(*args, **kwargs)


def create_ram_directory_image(state, image_file, size):
    """Create a temporary RAM directory."""
    logger.info('Create RAM directory for image: %s (%s)', image_file, size)
    directory = tempfile.TemporaryDirectory()
    run([
        'mount', '-o', 'size=' + size, '-t', 'tmpfs', 'tmpfs', directory.name
    ])

    state['ram_directory'] = directory
    temp_image_file = os.path.join(directory.name,
                                   os.path.basename(image_file))
    state['image_file'] = temp_image_file

    schedule_cleanup(state, remove_ram_directory, directory)
    schedule_cleanup(state, copy_image, state, temp_image_file, image_file)


def remove_ram_directory(directory):
    """Remove RAM directory created for temporary image path."""
    logger.info('Cleanup RAM directory %s', directory)
    run(['umount', directory.name])
    directory.cleanup()


def copy_image(state, source_image, target_image):
    """Copy from temp image to target path."""
    if not state['success']:
        target_image += '.failed'

    logger.info('Copying file: %s -> %s', source_image, target_image)
    run(['cp', '--sparse=always', source_image, target_image])


def create_temp_image(state, image_file):
    """Create a temp image for a target image file on disk."""
    temp_image_file = image_file + '.temp'
    logger.info('Creating temp image %s for image %s', temp_image_file,
                image_file)
    state['image_file'] = temp_image_file

    schedule_cleanup(state, move_image, state, temp_image_file, image_file)


def move_image(state, source_image, target_image):
    """Move from temp image name to final image name."""
    if not state['success']:
        target_image += '.failed'

    logger.info('Moving image: %s -> %s', source_image, target_image)
    run(['mv', source_image, target_image])


def create_image(state, size):
    """Create an empty sparse file using qemu-image."""
    logger.info('Creating image %s of size %s', state['image_file'], size)
    run(['qemu-img', 'create', '-f', 'raw', state['image_file'], size])


def create_partition_table(state, partition_table_type):
    """Create an empty partition table in given device."""
    logger.info('Creating partition table on %s of type %s',
                state['image_file'], partition_table_type)
    run(['parted', '-s', state['image_file'], 'mklabel', partition_table_type])


def create_partition(state, label, start, end, filesystem_type):
    """Create a primary partition in a given device."""
    filesystem_map = {'vfat': 'fat32'}
    filesystem_type = filesystem_map.get(filesystem_type, filesystem_type)

    partition_type = 'primary'
    logger.info('Creating partition %s in %s (range %s - %s) of type %s',
                label, state['image_file'], start, end, filesystem_type)
    run([
        'parted', '-s', state['image_file'], 'mkpart', partition_type,
        filesystem_type, start, end
    ])

    state.setdefault('partitions', []).append(label)


def set_flag(state, partition_number, flag):
    """Set a flag on a partition of a device."""
    logger.info('Setting %s flag on %s partition for %s', flag,
                partition_number, state['image_file'])
    run([
        'parted', '-s', state['image_file'], 'set',
        str(partition_number), flag, 'on'
    ])


def loopback_setup(state):
    """Perform mapping to loopback devices from partitions in image file."""
    logger.info('Setting up loopback mappings for %s', state['image_file'])
    output = run(['kpartx', '-asv', state['image_file']]).decode()
    loop_device = None
    devices = []
    partition_number = 0
    for line in output.splitlines():
        columns = line.split()
        if columns[0] == 'add':
            device = '/dev/mapper/{}'.format(columns[2])
            label = state['partitions'][partition_number]
            state.setdefault('devices', {})[label] = device
            devices.append(device)
            partition_number += 1
            if not loop_device:
                loop_device = re.match(r'^(loop\d+)p\d+$', columns[2])[1]
                loop_device = '/dev/{}'.format(loop_device)
                state['loop_device'] = loop_device

    # Cleanup runs in reverse order
    if loop_device:
        schedule_cleanup(state, force_release_loop_device, loop_device)

    for device in devices:
        schedule_cleanup(state, force_release_partition_loop, device)

    schedule_cleanup(state, loopback_teardown, state['image_file'])


def force_release_partition_loop(loop_device):
    """Force release a partition mapping on a loop device."""
    logger.info('Force releasing partition on loop device %s', loop_device)
    run(['dmsetup', 'remove', loop_device], check=False)


def force_release_loop_device(loop_device):
    """Force release of a loop setup for entire device"""
    logger.info('Force releasing loop setup for device %s', loop_device)
    run(['losetup', '-d', loop_device], check=False)


def loopback_teardown(image_file):
    """Unmap loopback devices from partitions in image file."""
    logger.info('Tearing down loopback mappings for %s', image_file)
    run(['kpartx', '-dsv', image_file])


def create_filesystem(device, filesystem_type):
    """Create a filesystem on a given device."""
    logger.info('Creating filesystem on %s of type %s', device,
                filesystem_type)
    run(['mkfs', '-t', filesystem_type, device])

    # Due to a bug, probably in udev, /dev/disk/by-uuid/<uuid> link is not
    # reliably created after the creation of the filesystem. This leads to
    # update-grub using root=/dev/mapper/loop0p1 instead of root=UUID=<uuid>
    # when creating grub.cfg. This results in an unbootable image. Force udev
    # events as a workaround.
    run(['udevadm', 'trigger', device])
    run(['udevadm', 'settle'])


def mount_filesystem(state,
                     label_or_path,
                     sub_mount_point,
                     is_bind_mount=False,
                     filesystem_type=None):
    """Mount a device on a mount point."""
    if not sub_mount_point:
        mount_point = tempfile.mkdtemp()
        state['mount_point'] = mount_point
    else:
        mount_point = path_in_mount(state, sub_mount_point)
        os.makedirs(mount_point, exist_ok=True)

    options = []
    if filesystem_type == 'btrfs':
        options += ['-o', 'discard=sync']

    if not is_bind_mount:
        device = state['devices'][label_or_path]
    else:
        device = label_or_path
        options += ['-o', 'bind']

    logger.info('Mounting device %s on %s with options %s', device,
                mount_point, options)
    run(['mount', device, mount_point] + options)
    state.setdefault('sub_mount_points', {})[label_or_path] = sub_mount_point

    schedule_cleanup(state, unmount_filesystem, device, mount_point)


def unmount_filesystem(device, mount_point, check=True):
    """Unmount a filesystem."""
    logger.info('Unmounting device %s from mount point %s', device,
                mount_point)
    run(['umount', mount_point], check=check)


def process_cleanup(state):
    """Kill all processes using a given mount point."""
    mount_point = state['mount_point']
    logger.info('Killing all processes on the mount point %s', mount_point)
    run(['fuser', '-mvk', mount_point], check=False)
    # XXX: Twice seems to work better?
    run(['fuser', '-mvk', mount_point], check=False)


def debootstrap(state, architecture, distribution, variant, components,
                packages, mirror):
    """Debootstrap into a mounted directory."""
    target = state['mount_point']
    logger.info(
        'Debootstraping into %s, architecture %s, '
        'distribution %s, variant %s, components %s, build mirror %s', target,
        architecture, distribution, variant, components, mirror)
    try:
        run([
            'debootstrap', '--arch=' + architecture, '--variant=' + variant,
            '--components=' + ','.join(components),
            '--include=' + ','.join(packages), distribution, target, mirror
        ])
    except (Exception, KeyboardInterrupt):
        run(['tail', os.path.join(target, 'debootstrap/debootstrap.log')])
        logger.info(
            'Unmounting filesystems that may have been left by debootstrap')
        for path in ('proc', 'sys', 'etc/machine-id'):
            unmount_filesystem(None, os.path.join(target, path), check=False)
        raise

    # During bootstrap, /etc/machine-id path might be bind mounted.
    schedule_cleanup(state,
                     unmount_filesystem,
                     None,
                     os.path.join(target, 'etc/machine-id'),
                     check=False)


@contextlib.contextmanager
def no_run_daemon_policy(state):
    """Context manager to ensure daemons are not run during installs."""
    logger.info('Enforcing policy not to run daemons')
    path = path_in_mount(state, 'usr/sbin/policy-rc.d')
    content = '''#!/bin/sh
exit 101
'''
    with open(path, 'w') as file_handle:
        file_handle.write(content)

    os.chmod(path, 0o755)
    yield

    logger.info('Relaxing policy not to run daemons')
    os.unlink(path)


def install_from_backports(state, packages):
    """Install a package using apt from backports."""

    with no_run_daemon_policy(state):
        try:
            logger.info('Installing packages from backports: %s',
                        " ".join(packages))
            run_in_chroot(state, [
                'apt-get', 'install', '-y', *[
                    f'{package}/{releases.STABLE_CODENAME}-backports'
                    for package in packages
                ]
            ])
        except cliapp.AppException:
            logger.info(
                'Packages not found in backports. Installing from stable.')
            run_in_chroot(state, ['apt-get', 'install', '-y', *packages])

    run_in_chroot(state, ['apt-get', 'autoremove', '-y'])


def install_package(state, package):
    """Install a package using apt."""
    logger.info('Installing package %s', package)

    with no_run_daemon_policy(state):
        run_in_chroot(state, ['apt-get', 'install', '-y', package])


def install_custom_package(state, package_path):
    """Install a custom .deb file."""
    logger.info('Install custom .deb package %s', package_path)
    sub_destination = os.path.join('tmp', os.path.basename(package_path))
    destination_path = path_in_mount(state, sub_destination)
    shutil.copyfile(package_path, destination_path)
    package_path = os.path.join('/tmp', os.path.basename(package_path))
    with no_run_daemon_policy(state):
        run_in_chroot(
            state,
            ['apt', 'install', '-y', '--allow-downgrades', package_path])


def set_hostname(state, hostname):
    """Set the hostname inside the image."""
    logger.info('Setting hostname to %s', hostname)

    etc_hostname_path = path_in_mount(state, 'etc/hostname')
    with open(etc_hostname_path, 'w') as file_handle:
        file_handle.write(hostname + '\n')

    etc_hosts_path = path_in_mount(state, 'etc/hosts')
    with open(etc_hosts_path, 'r') as file_handle:
        lines = file_handle.readlines()
    with open(etc_hosts_path, 'w') as file_handle:
        appended = False
        for line in lines:
            if line.startswith('127.0.1.1'):
                line = line + ' ' + hostname
                appended = True

            file_handle.write(line)

        if not appended:
            file_handle.write('127.0.1.1 ' + hostname + '\n')


def get_fstab_options(filesystem_type):
    """Return options to use for a filesystem type."""
    if filesystem_type == 'btrfs':
        # Enable btrfs transparent compression. See:
        # https://fedoraproject.org/wiki/Changes/BtrfsTransparentCompression
        flags = ['compress=zstd:1']
    else:
        flags = ['errors=remount-ro']

    return ','.join(flags) or 'defaults'


def get_uuid_of_device(device):
    """Return the UUID of a given device."""
    output = run(['blkid', '--output=value', '--match-tag=UUID', device])
    return output.decode().strip()


def add_fstab_entry(state, label, filesystem_type, pass_number, append=True):
    """Add an entry in /etc/fstab for a disk partition."""
    file_path = path_in_mount(state, 'etc/fstab')
    device = 'UUID={}'.format(get_uuid_of_device(state['devices'][label]))
    values = {
        'device': device,
        'mount_point': '/' + (state['sub_mount_points'][label] or ''),
        'filesystem_type': filesystem_type,
        'options': get_fstab_options(filesystem_type),
        'frequency': '0',
        'pass_number': pass_number
    }
    line = '{device} {mount_point} {filesystem_type} {options} {frequency} ' \
           '{pass_number}\n'
    line = line.format(**values)
    logger.info('Adding fstab entry %s', values)

    mode = 'a' if append else 'w'
    with open(file_path, mode) as file_handle:
        file_handle.write(line)


def install_grub(state, target=None, is_efi=False, secure_boot=False):
    """Install grub boot loader on the loop back device."""
    device = state['loop_device']
    logger.info('Installing grub boot loader on device %s', device)
    run_in_chroot(state, ['update-grub'])
    args = [f'--target={target}'] if target else []
    args += ['--no-nvram'] if is_efi else []
    args += ['--uefi-secure-boot'] if secure_boot else []
    run_in_chroot(state, ['grub-install', device] + args)


def setup_apt(state, mirror, distribution, components, enable_backports=False):
    """Setup apt sources and update the cache."""
    logger.info('Setting apt for mirror %s', mirror)
    values = {
        'mirror': mirror,
        'distribution': distribution,
        'components': ' '.join(components)
    }
    basic_template = '''
deb {mirror} {distribution} {components}
deb-src {mirror} {distribution} {components}
'''
    updates_template = '''
deb {mirror} {distribution}-updates {components}
deb-src {mirror} {distribution}-updates {components}
'''
    security_template = '''
deb http://security.debian.org/debian-security/ {distribution}-security {components}
deb-src http://security.debian.org/debian-security/ {distribution}-security {components}
'''
    backports_template = f'''
deb http://deb.debian.org/debian {releases.STABLE_CODENAME}-backports main
deb-src http://deb.debian.org/debian {releases.STABLE_CODENAME}-backports main
'''
    file_path = path_in_mount(state, 'etc/apt/sources.list')
    with open(file_path, 'w') as file_handle:
        file_handle.write(basic_template.format(**values))
        if distribution not in ('sid', 'unstable'):
            file_handle.write(updates_template.format(**values))
            if enable_backports:
                file_handle.write(backports_template)

            file_handle.write(security_template.format(**values))

    run_in_chroot(state, ['apt-get', 'update'])
    run_in_chroot(state, ['apt-get', 'clean'])


def setup_flash_kernel(state, machine_name, kernel_options,
                       boot_filesystem_type):
    """Setup and install flash-kernel package."""
    logger.info('Setting up flash kernel for machine %s with options %s',
                machine_name, kernel_options)
    directory_path = path_in_mount(state, 'etc/flash-kernel')
    os.makedirs(directory_path, exist_ok=True)

    file_path = path_in_mount(state, 'etc/flash-kernel/machine')
    with open(file_path, 'w') as file_handle:
        file_handle.write(machine_name)

    if kernel_options:
        stdin = 'flash-kernel flash-kernel/linux_cmdline string {}'.format(
            kernel_options)
        run_in_chroot(state, ['debconf-set-selections'], input=stdin.encode())

    run_in_chroot(state, ['apt-get', 'install', '-y', 'flash-kernel'])

    # flash-kernel creates links in /boot and does not work when the filesystem
    # is vfat.
    if boot_filesystem_type != 'vfat':
        run_in_chroot(state, ['flash-kernel'])


def update_initramfs(state):
    """Update the initramfs in the disk image to make it use fstab etc."""
    logger.info('Updating initramfs')
    run_in_chroot(state, ['update-initramfs', '-u'])


def install_boot_loader_part(state, path, seek, size, count=None):
    """Do a dd copy for a file onto the disk image."""
    image_file = state['image_file']
    full_path = path_in_mount(state, path)
    logger.info('Installing boot loader part %s at seek=%s, size=%s, count=%s',
                full_path, seek, size, count)
    command = [
        'dd', 'if=' + full_path, 'of=' + image_file, 'seek=' + seek,
        'bs=' + size, 'conv=notrunc'
    ]
    if count:
        command.append('count=' + count)

    run(command)


def compress(archive_file, image_file):
    """Compress an image using xz."""
    logger.info('Compressing file %s to %s', image_file, archive_file)
    # Take upto 50% RAM when doing compression. Otherwise, number of threads
    # will be reduced to accommodate.
    command = [
        'xz', '--no-warn', '--threads=0', '--memlimit=50%', '-9', '--force'
    ]
    run(command + [image_file])


def sign(archive):
    """Sign an image using GPG."""
    logger.info('Signing file %s with GPG', archive)
    signature = archive + '.sig'
    try:
        os.remove(signature)
    except FileNotFoundError:
        pass

    run(['gpg', '--output', signature, '--detach-sig', archive])