File: comic.py

package info (click to toggle)
dosage 3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,400 kB
  • sloc: python: 12,703; sh: 55; makefile: 6
file content (153 lines) | stat: -rw-r--r-- 5,828 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: © 2004 Tristan Seligmann and Jonathan Jacobs
# SPDX-FileCopyrightText: © 2012 Bastian Kleineidam
# SPDX-FileCopyrightText: © 2015 Tobias Gruetzmacher
from __future__ import annotations

import codecs
import contextlib
import glob
import logging
import mimetypes
import os
from datetime import datetime
from typing import Iterator

from rich import filesize

from .events import getHandler
from .util import getFilename, unquote, urlopen

logger = logging.getLogger(__name__)

# Maximum content size for images
MAX_IMAGE_BYTES = 1024 * 1024 * 20  # 20 MB
# RFC 1123 format, as preferred by RFC 2616
RFC_1123_DT_STR = "%a, %d %b %Y %H:%M:%S GMT"


class ComicStrip:
    """A list of comic image URLs."""

    def __init__(self, scraper, strip_url: str, image_urls: str, text=None) -> None:
        """Store the image URL list."""
        self.scraper = scraper
        self.strip_url = strip_url
        self.image_urls = image_urls
        self.text = text

    def getImages(self) -> Iterator[ComicImage]:
        """Get a list of image downloaders."""
        for image_url in self.image_urls:
            yield self.getDownloader(image_url)

    def getDownloader(self, url: str) -> ComicImage:
        """Get an image downloader."""
        filename = self.scraper.namer(url, self.strip_url)
        return ComicImage(self.scraper, url, self.strip_url, filename,
                          text=self.text)


class ComicImage:
    """A comic image downloader."""

    ChunkBytes = 1024 * 100  # 100KB

    def __init__(self, scraper, url, referrer, filename, text=None) -> None:
        """Set URL and filename."""
        self.scraper = scraper
        self.referrer = referrer
        self.url = url
        filename = getFilename(filename)
        self.filename, self.ext = os.path.splitext(filename)
        if not self.ext:
            self.ext = '.bin'
        self.text = text

    def connect(self, lastchange=None):
        """Connect to host and get meta information."""
        headers = {}
        if lastchange:
            headers['If-Modified-Since'] = lastchange.strftime(RFC_1123_DT_STR)
        self.urlobj = urlopen(self.url, self.scraper.session,
                              referrer=self.referrer,
                              max_content_bytes=MAX_IMAGE_BYTES, stream=True,
                              headers=headers)
        if self.urlobj.status_code == 304:  # Not modified
            return
        content_type = unquote(self.urlobj.headers.get(
            'content-type', 'application/octet-stream'))
        content_type = content_type.split(';', 1)[0]
        maintype = content_type.split('/', 1)[0]
        if maintype != 'image' and content_type not in (
                'application/octet-stream', 'application/x-shockwave-flash'):
            raise IOError('content type %r is not an image at %s' % (
                content_type, self.url))
        # Try to guess "better" extension from mime type
        guessed_ext = mimetypes.guess_extension(content_type)
        if guessed_ext and guessed_ext != '.bin':
            self.ext = guessed_ext
        self.contentLength = int(self.urlobj.headers.get('content-length', 0))
        logger.debug('... filename = %r, ext = %r, contentLength = %d', self.filename, self.ext, self.contentLength)

    def save(self, basepath):
        """Save comic URL to filename on disk."""
        fnbase = self._fnbase(basepath)
        exist = [x for x in glob.glob(fnbase + ".*") if not x.endswith(".txt")]
        logger.moreinfo("Get image URL %r", self.url)
        if len(exist) == 1:
            lastchange = os.path.getmtime(exist[0])
            self.connect(datetime.utcfromtimestamp(lastchange))
            if self.urlobj.status_code == 304:  # Not modified
                self._exist_err(exist[0])
                return exist[0], False
        else:
            self.connect()
        fn = fnbase + self.ext
        # compare with >= since content length could be the compressed size
        if os.path.isfile(fn) and os.path.getsize(fn) >= self.contentLength:
            self._exist_err(fn)
            return fn, False
        logger.debug('Writing comic to file %r...', fn)
        with self.fileout(fn) as f:
            for chunk in self.urlobj.iter_content(self.ChunkBytes):
                f.write(chunk)
        if self.text:
            fntext = fnbase + ".txt"
            logger.debug('Writing comic text to file %s...', fntext)
            with self.fileout(fntext, encoding='utf-8') as f:
                f.write(self.text)
        getHandler().comicDownloaded(self, fn)
        return fn, True

    @contextlib.contextmanager
    def fileout(self, filename, encoding=None):
        """Write content to given filename. Checks for zero-sized files.
        If encoding is given writes to a codec.open() file."""
        def getfp(filename, encoding):
            """Get open file object."""
            if encoding:
                return codecs.open(filename, 'w', encoding)
            return open(filename, 'wb')

        try:
            with getfp(filename, encoding) as fp:
                yield fp
                size = fp.tell()
        except Exception:
            if os.path.isfile(filename):
                os.remove(filename)
            raise
        else:
            logger.info("Saved %r (%s).", filename, filesize.decimal(size))

    def _exist_err(self, fn):
        logger.info('Skipping existing file %r.', fn)

    def _fnbase(self, basepath):
        '''Determine the target base name of this comic file and make sure the
        directory exists.'''
        comicdir = self.scraper.get_download_dir(basepath)
        if not os.path.isdir(comicdir):
            os.makedirs(comicdir)
        return os.path.join(comicdir, self.filename)