1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
import binascii
import struct
from typing import Any, Optional
def parse_query(text_input: str, query: str) -> tuple[list[dict[str, Any]], int]:
from py_partiql_parser import S3SelectParser
parser = S3SelectParser(source_data=text_input)
result = parser.parse(query)
return result, parser.bytes_scanned
def _create_header(key: bytes, value: bytes) -> bytes:
return struct.pack("b", len(key)) + key + struct.pack("!bh", 7, len(value)) + value
def _create_message(
content_type: Optional[bytes], event_type: bytes, payload: bytes
) -> bytes:
headers = _create_header(b":message-type", b"event")
headers += _create_header(b":event-type", event_type)
if content_type is not None:
headers += _create_header(b":content-type", content_type)
headers_length = struct.pack("!I", len(headers))
total_length = struct.pack("!I", len(payload) + len(headers) + 16)
prelude = total_length + headers_length
prelude_crc = struct.pack("!I", binascii.crc32(total_length + headers_length))
message_crc = struct.pack(
"!I", binascii.crc32(prelude + prelude_crc + headers + payload)
)
return prelude + prelude_crc + headers + payload + message_crc
def _create_stats_message(bytes_scanned: int, bytes_returned: int) -> bytes:
stats = f"<Stats><BytesScanned>{bytes_scanned}</BytesScanned><BytesProcessed>{bytes_scanned}</BytesProcessed><BytesReturned>{bytes_returned}</BytesReturned></Stats>"
return _create_message(
content_type=b"text/xml", event_type=b"Stats", payload=stats.encode("utf-8")
)
def _create_data_message(payload: bytes) -> bytes:
# https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
return _create_message(
content_type=b"application/octet-stream", event_type=b"Records", payload=payload
)
def _create_end_message() -> bytes:
return _create_message(content_type=None, event_type=b"End", payload=b"")
def serialize_select(data_list: list[bytes], bytes_scanned: int) -> bytes:
bytes_returned = 0
all_data = b""
for data in data_list:
bytes_returned += len(data)
all_data += data
response = _create_data_message(all_data)
return (
response
+ _create_stats_message(bytes_scanned, bytes_returned)
+ _create_end_message()
)
|