File: search.py

package info (click to toggle)
grpc 1.51.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 76,144 kB
  • sloc: cpp: 361,866; python: 72,206; ansic: 37,778; objc: 12,434; ruby: 11,521; sh: 7,652; php: 7,615; makefile: 3,481; xml: 3,246; cs: 1,836; javascript: 1,614; java: 465; pascal: 227; awk: 132
file content (148 lines) | stat: -rw-r--r-- 5,062 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright 2019 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A search algorithm over the space of all bytestrings."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import base64
import hashlib
import itertools
import logging
import struct

from examples.python.cancellation import hash_name_pb2

_LOGGER = logging.getLogger(__name__)
_BYTE_MAX = 255


def _get_hamming_distance(a, b):
    """Calculates hamming distance between strings of equal length."""
    distance = 0
    for char_a, char_b in zip(a, b):
        if char_a != char_b:
            distance += 1
    return distance


def _get_substring_hamming_distance(candidate, target):
    """Calculates the minimum hamming distance between between the target
        and any substring of the candidate.

    Args:
      candidate: The string whose substrings will be tested.
      target: The target string.

    Returns:
      The minimum Hamming distance between candidate and target.
    """
    min_distance = None
    if len(target) > len(candidate):
        raise ValueError("Candidate must be at least as long as target.")
    for i in range(len(candidate) - len(target) + 1):
        distance = _get_hamming_distance(candidate[i:i + len(target)].lower(),
                                         target.lower())
        if min_distance is None or distance < min_distance:
            min_distance = distance
    return min_distance


def _get_hash(secret):
    hasher = hashlib.sha1()
    hasher.update(secret)
    return base64.b64encode(hasher.digest()).decode('ascii')


class ResourceLimitExceededError(Exception):
    """Signifies the request has exceeded configured limits."""


def _bytestrings_of_length(length):
    """Generates a stream containing all bytestrings of a given length.

    Args:
      length: A positive integer length.

    Yields:
      All bytestrings of length `length`.
    """
    for digits in itertools.product(range(_BYTE_MAX), repeat=length):
        yield b''.join(struct.pack('B', i) for i in digits)


def _all_bytestrings():
    """Generates a stream containing all possible bytestrings.

    This generator does not terminate.

    Yields:
      All bytestrings in ascending order of length.
    """
    for bytestring in itertools.chain.from_iterable(
            _bytestrings_of_length(length) for length in itertools.count()):
        yield bytestring


def search(target,
           ideal_distance,
           stop_event,
           maximum_hashes,
           interesting_hamming_distance=None):
    """Find candidate strings.

    Search through the space of all bytestrings, in order of increasing length,
    indefinitely, until a hash with a Hamming distance of `maximum_distance` or
    less has been found.

    Args:
      target: The search string.
      ideal_distance: The desired Hamming distance.
      stop_event: An event indicating whether the RPC should terminate.
      maximum_hashes: The maximum number of hashes to check before stopping.
      interesting_hamming_distance: If specified, strings with a Hamming
        distance from the target below this value will be yielded.

    Yields:
      Instances  of HashNameResponse. The final entry in the stream will be of
        `maximum_distance` Hamming distance or less from the target string,
        while all others will be of less than `interesting_hamming_distance`.

    Raises:
      ResourceLimitExceededError: If the computation exceeds `maximum_hashes`
        iterations.
    """
    hashes_computed = 0
    for secret in _all_bytestrings():
        if stop_event.is_set():
            return
        candidate_hash = _get_hash(secret)
        distance = _get_substring_hamming_distance(candidate_hash, target)
        if interesting_hamming_distance is not None and distance <= interesting_hamming_distance:
            # Surface interesting candidates, but don't stop.
            yield hash_name_pb2.HashNameResponse(
                secret=base64.b64encode(secret),
                hashed_name=candidate_hash,
                hamming_distance=distance)
        elif distance <= ideal_distance:
            # Yield ideal candidate and end the stream.
            yield hash_name_pb2.HashNameResponse(
                secret=base64.b64encode(secret),
                hashed_name=candidate_hash,
                hamming_distance=distance)
            return
        hashes_computed += 1
        if hashes_computed == maximum_hashes:
            raise ResourceLimitExceededError()