File: mirror_download.py

package info (click to toggle)
simple-cdd 0.6.9
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 344 kB
  • sloc: python: 1,904; sh: 111; makefile: 13
file content (155 lines) | stat: -rw-r--r-- 7,350 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from simple_cdd.exceptions import Fail
from simple_cdd.utils import run_command, Checksums
from simple_cdd.gnupg import Gnupg
from .base import Tool
from urllib.parse import urlparse, urljoin
from urllib import request
import os
import re
import logging

log = logging.getLogger()

@Tool.register
class ToolMirrorDownload(Tool):
    type = "mirror"
    name = "download"

    def __init__(self, env):
        self.env = env
        self.gnupg = Gnupg(env)

    def check_pre(self):
        super().check_pre()
        if not self.env.get("DI_WWW_HOME") and not self.env.get("custom_installer"):
            if not self.env.get("checksum_files"):
                raise Fail("Cannot run mirror/download: checksum_files is empty")
            if not self.env.get("di_match_files"):
                raise Fail("Cannot run mirror/download: di_match_files is empty")

    def run(self):
        env = self.env
        logdir = env.get("simple_cdd_logs")
        logfilename = os.path.join(logdir, "{}-{}.log".format(self.type, self.name))

        with open(logfilename, "wt") as logfd:
            baseurl = env.get("files_debian_mirror")
            path_depth = urlparse(baseurl).path.strip("/").count("/") + 1

            if env.get("http_proxy"):
                os.environ.setdefault('http_proxy', env.get("http_proxy"))

            def _download(url, output, checksums=None, relname=None):
                if checksums:
                    if os.path.exists(output):
                        try:
                            checksums.verify_file(output, relname)
                            log.debug("skipping download: %s checksum matched", output)
                            return
                        except Fail:
                            log.debug("re-downloading: %s checksum invalid", output)
                            pass
                if not os.path.isdir(os.path.dirname(output)):
                    os.makedirs(os.path.dirname(output))
                log.debug("downloading: %s", output)
                request.urlretrieve(url, filename=output)
                if checksums:
                    checksums.verify_file(output, relname)

            if env.get("mirror_files"):
                # Download the checksums present in the archive "extrafiles" and verify
                extrafiles_file_inlinesig = os.path.join(env.get("MIRROR"), "extrafiles")
                extrafiles_file= os.path.join(env.get("simple_cdd_temp"), "extrafiles.unsigned")
                download_extrafiles_file = os.path.join(env.get("files_debian_mirror"), "extrafiles")
                _download(download_extrafiles_file, extrafiles_file_inlinesig)
                self.gnupg.verify_inline_sig(extrafiles_file_inlinesig)
                self.gnupg.extract_inline_contents(extrafiles_file, extrafiles_file_inlinesig)

                # import checksums
                extrafile_sums = Checksums(self.env)
                extrafile_sums.parse_checksums_file(extrafiles_file, 'SHA256')

                with open(extrafiles_file, 'r') as ef:
                    efile = ef.readlines()
                match_mirror_files = []
                for m in env.get("mirror_files"):
                    if m.endswith('/'):
                        match_mirror_files.append(re.escape(m))
                    else:
                        match_mirror_files.append(re.escape(m) + "$")
                match_mirror_files = "(" + "|".join(match_mirror_files) + ")"
                ef_match = re.compile(match_mirror_files)
                ef_files = []
                for line in efile:
                    hashsum, relname = line.split()
                    if ef_match.match(relname):
                        ef_files.append({
                            "absname": os.path.join(env.get("MIRROR"), relname),
                            "relname": relname,
                            "url": os.path.join(env.get("files_debian_mirror"), relname),
                        })

                for x in ef_files:
                    _download(x["url"], x["absname"], checksums=extrafile_sums, relname=x["relname"])

            checksum_files = env.get("checksum_files")

            # Download files needed to build debian-installer image
            files = []
            files.extend(checksum_files)

            if checksum_files:
                # Get the release file and verify that it is valid
                release_file = os.path.join(env.get("simple_cdd_temp"), env.format("{DI_CODENAME}_Release"))
                download_release_file = os.path.join(env.get("files_debian_mirror"), "dists", env.get("DI_CODENAME"), "Release")
                _download(download_release_file, release_file)
                _download(download_release_file + ".gpg", release_file + ".gpg")
                self.gnupg.verify_detached_sig(release_file, release_file + ".gpg")

                # Parse the release file for checksums
                sums = Checksums(self.env)
                sums.parse_release_file(release_file)

                # Ensure that the checksum files are those referenced in the Release file
                # And build a list of additional files to download, matching
                # di_match_files in the checksum files contents
                di_match = re.compile(env.get("di_match_files"))
                for file in checksum_files:
                    if file.endswith("SHA256SUMS"):
                        hashtype = "SHA256"
                    elif file.endswith("MD5SUMS"):
                        hashtype = "MD5Sum"
                    else:
                        log.warning("Unknown hash type for %s, skipping file", file)
                        continue

                    separator = os.path.join('dists/', env.get("DI_CODENAME"), '')
                    separator, relname = file.split(separator)
                    absname = os.path.join(env.get("MIRROR"), file)
                    url = os.path.join(env.get("files_debian_mirror"), file)
                    # Validate the file
                    _download(url, absname, checksums=sums, relname=relname)

                    # Get the list of extra files to download: those whose
                    # pathname matches di_match
                    dirname = os.path.dirname(file)
                    extra_files = []
                    with open(absname, "rt") as fd:
                        for line in fd:
                            hashsum, relname = line.split()
                            if not di_match.search(relname): continue
                            if relname.startswith("./"): relname = relname[2:]
                            extra_files.append({
                                "absname": os.path.join(env.get("MIRROR"), dirname, relname),
                                "relname": relname,
                                "url": os.path.join(env.get("files_debian_mirror"), dirname, relname),
                            })

                    # Check downloaded files against their corresponding checksums.
                    file_sums = Checksums(self.env)
                    file_sums.parse_checksums_file(absname, hashtype)
                    for f in extra_files:
                        # Download the extra files
                        _download(f["url"], f["absname"], checksums=file_sums, relname=f["relname"])