1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
"""Helper functions for parsing strings."""
import base64
import ipaddress
import json
import struct
from datetime import datetime
from typing import Any, Optional, Union
from collections.abc import Iterator
from requests import Response
from .output_utils import demux_output
def parse_repository(name: str) -> tuple[str, Optional[str]]:
"""Parse repository image name from tag or digest
Returns:
item 1: repository name
item 2: Either digest and tag, tag, or None
"""
# split image name and digest
elements = name.split("@", 1)
if len(elements) == 2:
return elements[0], elements[1]
# split repository and image name from tag
# tags need to be split from the right since
# a port number might increase the split list len by 1
elements = name.rsplit(":", 1)
if len(elements) == 2 and "/" not in elements[1]:
return elements[0], elements[1]
return name, None
def decode_header(value: Optional[str]) -> dict[str, Any]:
"""Decode a base64 JSON header value."""
if value is None:
return {}
value = base64.b64decode(value)
text = value.decode("utf-8")
return json.loads(text)
def prepare_timestamp(value: Union[datetime, int, None]) -> Optional[int]:
"""Returns a UTC UNIX timestamp from given input."""
if value is None:
return None
if isinstance(value, int):
return value
if isinstance(value, datetime):
delta = value - datetime.utcfromtimestamp(0)
return delta.seconds + delta.days * 24 * 3600
raise ValueError(f"Type '{type(value)}' is not supported by prepare_timestamp()")
def prepare_cidr(value: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]) -> (str, str):
"""Returns network address and Base64 encoded netmask from CIDR.
The return values are dictated by the Go JSON decoder.
"""
return str(value.network_address), base64.b64encode(value.netmask.packed).decode("utf-8")
def frames(response: Response) -> Iterator[bytes]:
"""Returns each frame from multiplexed payload, all results are expected in the payload.
The stdout and stderr frames are undifferentiated as they are returned.
"""
length = len(response.content)
index = 0
while length - index > 8:
header = response.content[index : index + 8]
_, frame_length = struct.unpack_from(">BxxxL", header)
frame_begin = index + 8
frame_end = frame_begin + frame_length
index = frame_end
yield response.content[frame_begin:frame_end]
def stream_frames(
response: Response, demux: bool = False
) -> Iterator[Union[bytes, tuple[bytes, bytes]]]:
"""Returns each frame from multiplexed streamed payload.
If ``demux`` then output will be tuples where the first position is ``STDOUT`` and the second
is ``STDERR``.
"""
while True:
header = response.raw.read(8)
if not header:
return
_, frame_length = struct.unpack_from(">BxxxL", header)
if not frame_length:
continue
data = response.raw.read(frame_length)
if demux:
data = demux_output(header + data)
if not data:
return
yield data
def stream_helper(
response: Response, decode_to_json: bool = False
) -> Union[Iterator[bytes], Iterator[dict[str, Any]]]:
"""Helper to stream results and optionally decode to json"""
for value in response.iter_lines():
if decode_to_json:
yield json.loads(value)
else:
yield value
|