1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
import io
import zlib
from gzip import GzipFile
from typing import Optional
from storages.utils import to_bytes
class GzipCompressionWrapper(io.RawIOBase):
"""Wrapper for compressing file contents on the fly."""
def __init__(self, raw, level=zlib.Z_BEST_COMPRESSION):
super().__init__()
self.raw = raw
self.compress = zlib.compressobj(level=level, wbits=31)
self.leftover = bytearray()
@staticmethod
def readable():
return True
def readinto(self, buf: bytearray) -> Optional[int]:
size = len(buf)
while len(self.leftover) < size:
chunk = to_bytes(self.raw.read(size))
if not chunk:
if self.compress:
self.leftover += self.compress.flush(zlib.Z_FINISH)
self.compress = None
break
self.leftover += self.compress.compress(chunk)
if len(self.leftover) == 0:
return 0
output = self.leftover[:size]
size = len(output)
buf[:size] = output
self.leftover = self.leftover[size:]
return size
class CompressStorageMixin:
def _compress_content(self, content):
"""Gzip a given string content."""
return GzipCompressionWrapper(content)
class CompressedFileMixin:
def _decompress_file(self, mode, file, mtime=0.0):
return GzipFile(mode=mode, fileobj=file, mtime=mtime)
|