1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
# Copyright 2019 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A search algorithm over the space of all bytestrings."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import base64
import hashlib
import itertools
import logging
import struct
from examples.python.cancellation import hash_name_pb2
_LOGGER = logging.getLogger(__name__)
_BYTE_MAX = 255
def _get_hamming_distance(a, b):
"""Calculates hamming distance between strings of equal length."""
distance = 0
for char_a, char_b in zip(a, b):
if char_a != char_b:
distance += 1
return distance
def _get_substring_hamming_distance(candidate, target):
"""Calculates the minimum hamming distance between between the target
and any substring of the candidate.
Args:
candidate: The string whose substrings will be tested.
target: The target string.
Returns:
The minimum Hamming distance between candidate and target.
"""
min_distance = None
if len(target) > len(candidate):
raise ValueError("Candidate must be at least as long as target.")
for i in range(len(candidate) - len(target) + 1):
distance = _get_hamming_distance(candidate[i:i + len(target)].lower(),
target.lower())
if min_distance is None or distance < min_distance:
min_distance = distance
return min_distance
def _get_hash(secret):
hasher = hashlib.sha1()
hasher.update(secret)
return base64.b64encode(hasher.digest()).decode('ascii')
class ResourceLimitExceededError(Exception):
"""Signifies the request has exceeded configured limits."""
def _bytestrings_of_length(length):
"""Generates a stream containing all bytestrings of a given length.
Args:
length: A positive integer length.
Yields:
All bytestrings of length `length`.
"""
for digits in itertools.product(range(_BYTE_MAX), repeat=length):
yield b''.join(struct.pack('B', i) for i in digits)
def _all_bytestrings():
"""Generates a stream containing all possible bytestrings.
This generator does not terminate.
Yields:
All bytestrings in ascending order of length.
"""
for bytestring in itertools.chain.from_iterable(
_bytestrings_of_length(length) for length in itertools.count()):
yield bytestring
def search(target,
ideal_distance,
stop_event,
maximum_hashes,
interesting_hamming_distance=None):
"""Find candidate strings.
Search through the space of all bytestrings, in order of increasing length,
indefinitely, until a hash with a Hamming distance of `maximum_distance` or
less has been found.
Args:
target: The search string.
ideal_distance: The desired Hamming distance.
stop_event: An event indicating whether the RPC should terminate.
maximum_hashes: The maximum number of hashes to check before stopping.
interesting_hamming_distance: If specified, strings with a Hamming
distance from the target below this value will be yielded.
Yields:
Instances of HashNameResponse. The final entry in the stream will be of
`maximum_distance` Hamming distance or less from the target string,
while all others will be of less than `interesting_hamming_distance`.
Raises:
ResourceLimitExceededError: If the computation exceeds `maximum_hashes`
iterations.
"""
hashes_computed = 0
for secret in _all_bytestrings():
if stop_event.is_set():
return
candidate_hash = _get_hash(secret)
distance = _get_substring_hamming_distance(candidate_hash, target)
if interesting_hamming_distance is not None and distance <= interesting_hamming_distance:
# Surface interesting candidates, but don't stop.
yield hash_name_pb2.HashNameResponse(
secret=base64.b64encode(secret),
hashed_name=candidate_hash,
hamming_distance=distance)
elif distance <= ideal_distance:
# Yield ideal candidate and end the stream.
yield hash_name_pb2.HashNameResponse(
secret=base64.b64encode(secret),
hashed_name=candidate_hash,
hamming_distance=distance)
return
hashes_computed += 1
if hashes_computed == maximum_hashes:
raise ResourceLimitExceededError()
|