1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: © 2004 Tristan Seligmann and Jonathan Jacobs
# SPDX-FileCopyrightText: © 2012 Bastian Kleineidam
# SPDX-FileCopyrightText: © 2015 Tobias Gruetzmacher
from __future__ import annotations
import codecs
import contextlib
import glob
import logging
import mimetypes
import os
from datetime import datetime
from typing import Iterator
from rich import filesize
from .events import getHandler
from .util import getFilename, unquote, urlopen
logger = logging.getLogger(__name__)
# Maximum content size for images
MAX_IMAGE_BYTES = 1024 * 1024 * 20 # 20 MB
# RFC 1123 format, as preferred by RFC 2616
RFC_1123_DT_STR = "%a, %d %b %Y %H:%M:%S GMT"
class ComicStrip:
"""A list of comic image URLs."""
def __init__(self, scraper, strip_url: str, image_urls: str, text=None) -> None:
"""Store the image URL list."""
self.scraper = scraper
self.strip_url = strip_url
self.image_urls = image_urls
self.text = text
def getImages(self) -> Iterator[ComicImage]:
"""Get a list of image downloaders."""
for image_url in self.image_urls:
yield self.getDownloader(image_url)
def getDownloader(self, url: str) -> ComicImage:
"""Get an image downloader."""
filename = self.scraper.namer(url, self.strip_url)
return ComicImage(self.scraper, url, self.strip_url, filename,
text=self.text)
class ComicImage:
"""A comic image downloader."""
ChunkBytes = 1024 * 100 # 100KB
def __init__(self, scraper, url, referrer, filename, text=None) -> None:
"""Set URL and filename."""
self.scraper = scraper
self.referrer = referrer
self.url = url
filename = getFilename(filename)
self.filename, self.ext = os.path.splitext(filename)
if not self.ext:
self.ext = '.bin'
self.text = text
def connect(self, lastchange=None):
"""Connect to host and get meta information."""
headers = {}
if lastchange:
headers['If-Modified-Since'] = lastchange.strftime(RFC_1123_DT_STR)
self.urlobj = urlopen(self.url, self.scraper.session,
referrer=self.referrer,
max_content_bytes=MAX_IMAGE_BYTES, stream=True,
headers=headers)
if self.urlobj.status_code == 304: # Not modified
return
content_type = unquote(self.urlobj.headers.get(
'content-type', 'application/octet-stream'))
content_type = content_type.split(';', 1)[0]
maintype = content_type.split('/', 1)[0]
if maintype != 'image' and content_type not in (
'application/octet-stream', 'application/x-shockwave-flash'):
raise IOError('content type %r is not an image at %s' % (
content_type, self.url))
# Try to guess "better" extension from mime type
guessed_ext = mimetypes.guess_extension(content_type)
if guessed_ext and guessed_ext != '.bin':
self.ext = guessed_ext
self.contentLength = int(self.urlobj.headers.get('content-length', 0))
logger.debug('... filename = %r, ext = %r, contentLength = %d', self.filename, self.ext, self.contentLength)
def save(self, basepath):
"""Save comic URL to filename on disk."""
fnbase = self._fnbase(basepath)
exist = [x for x in glob.glob(fnbase + ".*") if not x.endswith(".txt")]
logger.moreinfo("Get image URL %r", self.url)
if len(exist) == 1:
lastchange = os.path.getmtime(exist[0])
self.connect(datetime.utcfromtimestamp(lastchange))
if self.urlobj.status_code == 304: # Not modified
self._exist_err(exist[0])
return exist[0], False
else:
self.connect()
fn = fnbase + self.ext
# compare with >= since content length could be the compressed size
if os.path.isfile(fn) and os.path.getsize(fn) >= self.contentLength:
self._exist_err(fn)
return fn, False
logger.debug('Writing comic to file %r...', fn)
with self.fileout(fn) as f:
for chunk in self.urlobj.iter_content(self.ChunkBytes):
f.write(chunk)
if self.text:
fntext = fnbase + ".txt"
logger.debug('Writing comic text to file %s...', fntext)
with self.fileout(fntext, encoding='utf-8') as f:
f.write(self.text)
getHandler().comicDownloaded(self, fn)
return fn, True
@contextlib.contextmanager
def fileout(self, filename, encoding=None):
"""Write content to given filename. Checks for zero-sized files.
If encoding is given writes to a codec.open() file."""
def getfp(filename, encoding):
"""Get open file object."""
if encoding:
return codecs.open(filename, 'w', encoding)
return open(filename, 'wb')
try:
with getfp(filename, encoding) as fp:
yield fp
size = fp.tell()
except Exception:
if os.path.isfile(filename):
os.remove(filename)
raise
else:
logger.info("Saved %r (%s).", filename, filesize.decimal(size))
def _exist_err(self, fn):
logger.info('Skipping existing file %r.', fn)
def _fnbase(self, basepath):
'''Determine the target base name of this comic file and make sure the
directory exists.'''
comicdir = self.scraper.get_download_dir(basepath)
if not os.path.isdir(comicdir):
os.makedirs(comicdir)
return os.path.join(comicdir, self.filename)
|