File: test_incremental_hex_digester.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (78 lines) | stat: -rw-r--r-- 2,705 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
######################################################################
#
# File: test/unit/utils/test_incremental_hex_digester.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import hashlib
import io

from b2sdk._internal.utils import (
    IncrementalHexDigester,
    Sha1HexDigest,
)
from test.unit.test_base import TestBase


class TestIncrementalHexDigester(TestBase):
    BLOCK_SIZE = 4

    def _get_sha1(self, input_data: bytes) -> Sha1HexDigest:
        return Sha1HexDigest(hashlib.sha1(input_data).hexdigest())

    def _get_digester(self, stream: io.IOBase) -> IncrementalHexDigester:
        return IncrementalHexDigester(stream, block_size=self.BLOCK_SIZE)

    def test_limited_read(self):
        limit = self.BLOCK_SIZE * 10
        input_data = b'1' * limit * 2
        stream = io.BytesIO(input_data)
        expected_sha1 = self._get_sha1(input_data[:limit])

        result_sha1 = self._get_digester(stream).update_from_stream(limit)

        self.assertEqual(expected_sha1, result_sha1)
        self.assertEqual(limit, stream.tell())

    def test_limited_read__stream_smaller_than_block_size(self):
        limit = self.BLOCK_SIZE * 99
        input_data = b'1' * (self.BLOCK_SIZE - 1)
        stream = io.BytesIO(input_data)
        expected_sha1 = self._get_sha1(input_data)

        result_sha1 = self._get_digester(stream).update_from_stream(limit)

        self.assertEqual(expected_sha1, result_sha1)
        self.assertEqual(len(input_data), stream.tell())

    def test_unlimited_read(self):
        input_data = b'1' * self.BLOCK_SIZE * 10
        stream = io.BytesIO(input_data)
        expected_sha1 = self._get_sha1(input_data)

        result_sha1 = self._get_digester(stream).update_from_stream()

        self.assertEqual(expected_sha1, result_sha1)
        self.assertEqual(len(input_data), stream.tell())

    def test_limited_and_unlimited_read(self):
        blocks_count = 5
        limit = self.BLOCK_SIZE * 5
        input_data = b'1' * limit * blocks_count
        stream = io.BytesIO(input_data)

        digester = self._get_digester(stream)

        for idx in range(blocks_count - 1):
            expected_sha1_part = self._get_sha1(input_data[: limit * (idx + 1)])
            result_sha1_part = digester.update_from_stream(limit)
            self.assertEqual(expected_sha1_part, result_sha1_part)

        expected_sha1_whole = self._get_sha1(input_data)
        result_sha1_whole = digester.update_from_stream()
        self.assertEqual(expected_sha1_whole, result_sha1_whole)