File: hashing.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (79 lines) | stat: -rw-r--r-- 2,391 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
######################################################################
#
# File: b2sdk/_internal/stream/hashing.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import hashlib
import io

from b2sdk._internal.stream.base import ReadOnlyStreamMixin
from b2sdk._internal.stream.wrapper import StreamWithLengthWrapper


class StreamWithHash(ReadOnlyStreamMixin, StreamWithLengthWrapper):
    """
    Wrap a file-like object, calculates SHA1 while reading
    and appends hash at the end.
    """

    def __init__(self, stream, stream_length=None):
        """
        :param stream: the stream to read from
        """
        self.digest = self.get_digest()
        total_length = None
        if stream_length is not None:
            total_length = stream_length + self.digest.digest_size * 2
        super().__init__(stream, length=total_length)
        self.hash = None
        self.hash_read = 0

    def seek(self, pos, whence=0):
        """
        Seek to a given position in the stream.

        :param int pos: position in the stream
        """
        if pos != 0 or whence != 0:
            raise io.UnsupportedOperation('Stream with hash can only be seeked to beginning')
        self.digest = self.get_digest()
        self.hash = None
        self.hash_read = 0
        return super().seek(0)

    def read(self, size=None):
        """
        Read data from the stream.

        :param int size: number of bytes to read
        :return: read data
        :rtype: bytes|None
        """
        data = b''
        if self.hash is None:
            data = super().read(size)
            # Update hash
            self.digest.update(data)

            # Check for end of stream
            if size is None or len(data) < size:
                self.hash = self.digest.hexdigest()
                if size is not None:
                    size -= len(data)

        if self.hash is not None:
            # The end of stream was reached, return hash now
            size = size or len(self.hash)
            data += str.encode(self.hash[self.hash_read : self.hash_read + size])
            self.hash_read += size
        return data

    @classmethod
    def get_digest(cls):
        return hashlib.sha1()