File: base.py

package info (click to toggle)
python-multiurl 0.3.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 168 kB
  • sloc: python: 1,318; makefile: 4
file content (154 lines) | stat: -rw-r--r-- 3,912 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# (C) Copyright 2021 ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.
#

import logging
import os

LOG = logging.getLogger(__name__)


def _ignore(*args, **kwargs):
    pass


class NoBar:
    def __init__(self, *args, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        pass

    def update(self, *args, **kwargs):
        pass

    def close(self, *args, **kwargs):
        pass


def progress_bar(total, initial=0, desc=None):
    try:
        # There is a bug in tqdm that expects ipywidgets
        # to be installed if running in a notebook
        import ipywidgets  # noqa F401
        from tqdm.auto import tqdm  # noqa F401
    except ImportError:
        try:
            from tqdm import tqdm  # noqa F401
        except ImportError:
            tqdm = NoBar

    return tqdm(
        total=total,
        initial=initial,
        unit_scale=True,
        unit_divisor=1024,
        unit="B",
        disable=False,
        leave=False,
        desc=desc,
    )


class DownloaderBase:
    def __init__(
        self,
        url,
        chunk_size=1024 * 1024,
        timeout=None,
        parts=None,
        observer=_ignore,
        statistics_gatherer=_ignore,
        progress_bar=progress_bar,
        resume_transfers=False,
        override_target_file=True,
        download_file_extension=None,
        auth=None,
        **kwargs,
    ):
        self.url = url
        self.chunk_size = chunk_size
        self.timeout = timeout
        self.parts = parts
        self.observer = observer
        self.statistics_gatherer = statistics_gatherer
        self.progress_bar = progress_bar
        self.resume_transfers = resume_transfers
        self.override_target_file = override_target_file
        self.download_file_extension = download_file_extension
        self.auth = auth

    def mutate(self, *args, **kwargs):
        return self

    def local_path(self):
        return None

    def extension(self, url=None):
        if url is None:
            url = self.url
        url_no_args = url.split("?")[0]
        base = os.path.basename(url_no_args)
        extensions = []
        while True:
            base, ext = os.path.splitext(base)
            if not ext:
                break
            extensions.append(ext)
        if not extensions:
            extensions.append(".unknown")
        return "".join(reversed(extensions))

    def download(self, target):
        if os.path.exists(target) and not self.override_target_file:
            return

        if self.download_file_extension is not None:
            download = target + ".download"
        else:
            download = target

        LOG.info("Downloading %s", self.url)

        size, mode, skip, trust_size = self.estimate_size(download)

        with self.progress_bar(
            total=size,
            initial=skip,
            desc=self.title(),
        ) as pbar:
            with open(download, mode) as f:
                total = self.transfer(f, pbar)

            pbar.close()

        if trust_size and size is not None:
            assert (
                os.path.getsize(download) == size
            ), f"File size mismatch {os.path.getsize(download)} bytes instead of {size}"

        if download != target:
            os.rename(download, target)

        self.finalise()
        return total

    def finalise(self):
        pass

    def title(self):
        return os.path.basename(self.url)

    def cache_data(self):
        return None

    def out_of_date(self, path, cache_data):
        return False