1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
"""
Python polyfills for itertools
"""
from __future__ import annotations
import itertools
import sys
from typing import Generator, Iterable, Iterator, TypeVar
from ..decorators import substitute_in_graph
__all__ = [
"chain",
"chain_from_iterable",
"islice",
"tee",
"compress",
]
_T = TypeVar("_T")
_U = TypeVar("_U")
# Reference: https://docs.python.org/3/library/itertools.html#itertools.chain
@substitute_in_graph(itertools.chain, is_embedded_type=True) # type: ignore[arg-type]
def chain(*iterables: Iterable[_T]) -> Iterator[_T]:
for iterable in iterables:
yield from iterable
@substitute_in_graph(itertools.chain.from_iterable) # type: ignore[arg-type]
def chain_from_iterable(iterable: Iterable[Iterable[_T]], /) -> Iterator[_T]:
return itertools.chain(*iterable)
chain.from_iterable = chain_from_iterable # type: ignore[method-assign]
# Reference: https://docs.python.org/3/library/itertools.html#itertools.islice
@substitute_in_graph(itertools.islice, is_embedded_type=True) # type: ignore[arg-type]
def islice(iterable: Iterable[_T], /, *args: int | None) -> Iterator[_T]:
s = slice(*args)
start = 0 if s.start is None else s.start
stop = s.stop
step = 1 if s.step is None else s.step
if start < 0 or (stop is not None and stop < 0) or step <= 0:
raise ValueError(
"Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.",
)
if stop is None:
# TODO: use indices = itertools.count() and merge implementation with the else branch
# when we support infinite iterators
next_i = start
for i, element in enumerate(iterable):
if i == next_i:
yield element
next_i += step
else:
indices = range(max(start, stop))
next_i = start
for i, element in zip(indices, iterable):
if i == next_i:
yield element
next_i += step
# Reference: https://docs.python.org/3/library/itertools.html#itertools.pairwise
if sys.version_info >= (3, 10):
@substitute_in_graph(itertools.pairwise, is_embedded_type=True) # type: ignore[arg-type]
def pairwise(iterable: Iterable[_T], /) -> Iterator[tuple[_T, _T]]:
a = None
first = True
for b in iterable:
if first:
first = False
else:
yield a, b # type: ignore[misc]
a = b
__all__ += ["pairwise"]
# Reference: https://docs.python.org/3/library/itertools.html#itertools.tee
@substitute_in_graph(itertools.tee)
def tee(iterable: Iterable[_T], n: int = 2, /) -> tuple[Iterator[_T], ...]:
iterator = iter(iterable)
shared_link = [None, None]
def _tee(link) -> Iterator[_T]: # type: ignore[no-untyped-def]
try:
while True:
if link[1] is None:
link[0] = next(iterator)
link[1] = [None, None]
value, link = link
yield value
except StopIteration:
return
return tuple(_tee(shared_link) for _ in range(n))
# Reference: https://docs.python.org/3/library/itertools.html#itertools.compress
@substitute_in_graph(itertools.compress, is_embedded_type=True) # type: ignore[arg-type]
def compress(
data: Iterable[_T], selectors: Iterable[_U], /
) -> Generator[_T, None, None]:
return (datum for datum, selector in zip(data, selectors) if selector)
|