File: hyperscan_pathspec_r1.py

package info (click to toggle)
python-pathspec 1.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,140 kB
  • sloc: python: 9,685; makefile: 14
file content (249 lines) | stat: -rw-r--r-- 6,545 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
This module defines the hyperscan backends for `.PathSpec`, revision 1, used in
benchmarking, but not included in the released library.
"""
from __future__ import annotations

from collections.abc import (
	Sequence)
from typing import (
	Any,
	Callable,  # Replaced by `collections.abc.Callable` in 3.9.2.
	Optional)  # Replaced by `X | None` in 3.10.
from typing_extensions import (
	override)  # Added in 3.12.

try:
	import hyperscan
except ModuleNotFoundError:
	hyperscan = None

from pathspec._backends.hyperscan._base import (
	HS_FLAGS,
	HyperscanExprDat)
from pathspec._backends.hyperscan.pathspec import (
	HyperscanPsBackend)
from pathspec.pattern import (
	RegexPattern)


class HyperscanPsR1BaseBackend(HyperscanPsBackend):
	"""
	The :class:`HyperscanPsR1BaseBackend` base class uses a hyperscan database in
	block mode for matching files.
	"""

	@override
	@staticmethod
	def _init_db(
		db: hyperscan.Database,
		debug: bool,
		patterns: list[tuple[int, RegexPattern]],
		sort_ids: Optional[Callable[[list[int]], None]],
	) -> list[HyperscanExprDat]:
		# NOTICE: This is the current implementation.

		# Prepare patterns.
		expr_data: list[HyperscanExprDat] = []
		exprs: list[bytes] = []
		for pattern_index, pattern in patterns:
			if pattern.include is None:
				continue

			# Encode regex.
			assert isinstance(pattern, RegexPattern), pattern
			regex = pattern.regex.pattern

			if isinstance(regex, bytes):
				regex_bytes = regex
			else:
				assert isinstance(regex, str), regex
				regex_bytes = regex.encode('utf8')

			expr_data.append(HyperscanExprDat(
				include=pattern.include,
				index=pattern_index,
				is_dir_pattern=False,
			))
			exprs.append(regex_bytes)

		# Compile patterns.
		db.compile(
			expressions=exprs,
			ids=list(range(len(exprs))),
			elements=len(exprs),
			flags=HS_FLAGS,
		)
		return expr_data

	@override
	@staticmethod
	def _make_db() -> hyperscan.Database:
		raise NotImplementedError()


class _HyperscanPsR1BlockBaseBackend(HyperscanPsR1BaseBackend):
	"""
	The :class:`_HyperscanPsR1BlockBaseBackend` base class uses a hyperscan
	database in block mode for matching files.
	"""

	@override
	@staticmethod
	def _make_db() -> hyperscan.Database:
		return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)


class HyperscanPsR1BlockClosureBackend(_HyperscanPsR1BlockBaseBackend):
	"""
	The :class:`HyperscanPsR1BlockClosureBackend` class uses a hyperscan database
	in block mode for matching files, and uses a closure to capture state.
	"""

	# Prevent accidental usage.
	_out: tuple[()]

	@override
	def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
		out_include = False
		out_index: Optional[int] = -1

		def on_match(
			expr_id: int, _from: int, _to: int, _flags: int, _context: Any,
		) -> Optional[bool]:
			# WARNING: Hyperscan does not guarantee matches will be produced in order!
			nonlocal out_include, out_index
			expr_dat = self._expr_data[expr_id]
			index = expr_dat.index
			if index > out_index:
				out_include = expr_dat.include
				out_index = index

		self._db.scan(file.encode('utf8'), match_event_handler=on_match)

		if out_index == -1:
			out_index = None

		return (out_include, out_index)


class HyperscanPsR1BlockStateBackend(_HyperscanPsR1BlockBaseBackend):
	"""
	The :class:`HyperscanPsR1BlockStateBackend` class uses a hyperscan database in
	block mode for matching files, and stores state in variables.
	"""

	def __init__(self, patterns: Sequence[RegexPattern]) -> None:
		super().__init__(patterns)
		self._out = (None, -1)

	@override
	def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
		self._out = (None, -1)
		self._db.scan(file.encode('utf8'), match_event_handler=self.__on_match)

		out_include, out_index = self._out
		if out_index == -1:
			out_index = None

		return (out_include, out_index)

	@override
	def __on_match(
		self,
		expr_id: int,
		_from: int,
		_to: int,
		_flags: int,
		_context: Any,
	) -> Optional[bool]:
		# WARNING: Hyperscan does not guarantee matches will be produced in order!
		expr_dat = self._expr_data[expr_id]
		index = expr_dat.index
		prev_index = self._out[1]
		if index > prev_index:
			self._out = (expr_dat.include, index)


class _HyperscanPsR1StreamBaseBackend(HyperscanPsR1BaseBackend):
	"""
	The :class:`_HyperscanPsR1StreamBaseBackend` base class uses a hyperscan
	database in streaming mode for matching files.
	"""

	@override
	@staticmethod
	def _make_db() -> hyperscan.Database:
		return hyperscan.Database(mode=hyperscan.HS_MODE_STREAM)


class HyperscanPsR1StreamClosureBackend(_HyperscanPsR1StreamBaseBackend):
	"""
	The :class:`HyperscanPsR1StreamClosureBackend` class uses a hyperscan database
	in streaming mode for matching files, and uses a closure to capture state.
	"""

	@override
	def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
		out_include = False
		out_index: Optional[int] = -1

		def on_match(
			expr_id: int, _from: int, _to: int, _flags: int, _context: Any,
		) -> Optional[bool]:
			# WARNING: Hyperscan does not guarantee matches will be produced in order!
			nonlocal out_include, out_index
			expr_dat = self._expr_data[expr_id]
			index = expr_dat.index
			if index > out_index:
				out_include = expr_dat.include
				out_index = index

		with self._db.stream(match_event_handler=on_match) as stream:
			stream.scan(file.encode('utf8'))

		if out_index == -1:
			out_index = None

		return (out_include, out_index)


# WARNING: This segfaults.
class HyperscanPsR1StreamStateBackend(_HyperscanPsR1StreamBaseBackend):
	"""
	The :class:`HyperscanPsR1StreamStateBackend` class uses a hyperscan database
	in streaming mode for matching files, and stores state in variables.
	"""

	def __init__(self, patterns: Sequence[RegexPattern]) -> None:
		super().__init__(patterns)
		self._out = (None, -1)

	@override
	def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
		self._out = (None, -1)

		with self._db.stream(match_event_handler=self.__on_match) as stream:
			stream.scan(file.encode('utf8'))

		out_include, out_index = self._out
		if out_index == -1:
			out_index = None

		return (out_include, out_index)

	@override
	def __on_match(
		self,
		expr_id: int,
		_from: int,
		_to: int,
		_flags: int,
		_context: Any,
	) -> Optional[bool]:
		# WARNING: Hyperscan does not guarantee matches will be produced in order!
		expr_dat = self._expr_data[expr_id]
		index = expr_dat.index
		prev_index = self._out[1]
		if index > prev_index:
			self._out = (expr_dat.include, index)