1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
|
"""Model and Manager for Container resources."""
import json
import logging
import shlex
from contextlib import suppress
from typing import Any, Optional, Union
from collections.abc import Iterable, Iterator, Mapping
import requests
from podman import api
from podman.api.output_utils import demux_output
from podman.domain.images import Image
from podman.domain.images_manager import ImagesManager
from podman.domain.manager import PodmanResource
from podman.errors import APIError
logger = logging.getLogger("podman.containers")
class Container(PodmanResource):
"""Details and configuration for a container managed by the Podman service."""
@property
def name(self):
"""str: Returns container's name."""
with suppress(KeyError):
if 'Name' in self.attrs:
return self.attrs["Name"].lstrip("/")
return self.attrs["Names"][0].lstrip("/")
return None
@property
def image(self):
"""podman.domain.images.Image: Returns Image object used to create Container."""
if "Image" in self.attrs:
image_id = self.attrs["Image"]
return ImagesManager(client=self.client).get(image_id)
return Image()
@property
def labels(self):
"""dict[str, str]: Returns labels associated with container."""
labels = None
with suppress(KeyError):
# Container created from ``list()`` operation
if "Labels" in self.attrs:
labels = self.attrs["Labels"]
# Container created from ``get()`` operation
else:
labels = self.attrs["Config"].get("Labels", {})
return labels or {}
@property
def status(self):
"""Literal["created", "initialized", "running", "stopped", "exited", "unknown"]:
Returns status of container."""
with suppress(KeyError):
return self.attrs["State"]["Status"]
return "unknown"
@property
def ports(self):
"""dict[str, int]: Return ports exposed by container."""
with suppress(KeyError):
return self.attrs["NetworkSettings"]["Ports"]
return {}
def attach(self, **kwargs) -> Union[str, Iterator[str]]:
"""Attach to container's tty.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return iterator of string(s) vs single string. Default: False
logs (bool): Include previous container output. Default: False
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def attach_socket(self, **kwargs):
"""Not Implemented.
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def commit(self, repository: str = None, tag: str = None, **kwargs) -> Image:
"""Save container to given repository.
Args:
repository: Where to save Image
tag: Tag to push with Image
Keyword Args:
author (str): Name of commit author
changes (list[str]): Instructions to apply during commit
comment (str): Commit message to include with Image, overrides keyword message
conf (dict[str, Any]): Ignored.
format (str): Format of the image manifest and metadata
message (str): Commit message to include with Image
pause (bool): Pause the container before committing it
"""
params = {
"author": kwargs.get("author"),
"changes": kwargs.get("changes"),
"comment": kwargs.get("comment", kwargs.get("message")),
"container": self.id,
"format": kwargs.get("format"),
"pause": kwargs.get("pause"),
"repo": repository,
"tag": tag,
}
response = self.client.post("/commit", params=params)
response.raise_for_status()
body = response.json()
return ImagesManager(client=self.client).get(body["Id"])
def diff(self) -> list[dict[str, int]]:
"""Report changes of a container's filesystem.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/changes")
response.raise_for_status()
return response.json()
# pylint: disable=too-many-arguments
def exec_run(
self,
cmd: Union[str, list[str]],
*,
stdout: bool = True,
stderr: bool = True,
stdin: bool = False,
tty: bool = False,
privileged: bool = False,
user=None,
detach: bool = False,
stream: bool = False,
socket: bool = False, # pylint: disable=unused-argument
environment: Union[Mapping[str, str], list[str]] = None,
workdir: str = None,
demux: bool = False,
) -> tuple[
Optional[int],
Union[Iterator[Union[bytes, tuple[bytes, bytes]]], Any, tuple[bytes, bytes]],
]:
"""Run given command inside container and return results.
Args:
cmd: Command to be executed
stdout: Attach to stdout. Default: True
stderr: Attach to stderr. Default: True
stdin: Attach to stdin. Default: False
tty: Allocate a pseudo-TTY. Default: False
privileged: Run as privileged.
user: User to execute command as.
detach: If true, detach from the exec command.
Default: False
stream: Stream response data. Ignored if ``detach`` is ``True``. Default: False
socket: Return the connection socket to allow custom
read/write operations. Default: False
environment: A dictionary or a list[str] in
the following format ["PASSWORD=xxx"] or
{"PASSWORD": "xxx"}.
workdir: Path to working directory for this exec session
demux: Return stdout and stderr separately
Returns:
A tuple of (``response_code``, ``output``).
``response_code``:
The exit code of the provided command. ``None`` if ``stream``.
``output``:
If ``stream``, then a generator yielding response chunks.
If ``demux``, then a tuple of (``stdout``, ``stderr``).
Else the response content.
Raises:
NotImplementedError: method not implemented.
APIError: when service reports error
"""
# pylint: disable-msg=too-many-locals
if isinstance(environment, dict):
environment = [f"{k}={v}" for k, v in environment.items()]
data = {
"AttachStderr": stderr,
"AttachStdin": stdin,
"AttachStdout": stdout,
"Cmd": cmd if isinstance(cmd, list) else shlex.split(cmd),
# "DetachKeys": detach, # This is something else
"Env": environment,
"Privileged": privileged,
"Tty": tty,
"WorkingDir": workdir,
}
if user:
data["User"] = user
stream = stream and not detach
# create the exec instance
response = self.client.post(f"/containers/{self.name}/exec", data=json.dumps(data))
response.raise_for_status()
exec_id = response.json()['Id']
# start the exec instance, this will store command output
start_resp = self.client.post(
f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream
)
start_resp.raise_for_status()
if stream:
return None, api.stream_frames(start_resp, demux=demux)
# get and return exec information
response = self.client.get(f"/exec/{exec_id}/json")
response.raise_for_status()
if demux:
stdout_data, stderr_data = demux_output(start_resp.content)
return response.json().get('ExitCode'), (stdout_data, stderr_data)
return response.json().get('ExitCode'), start_resp.content
def export(self, chunk_size: int = api.DEFAULT_CHUNK_SIZE) -> Iterator[bytes]:
"""Download container's filesystem contents as a tar archive.
Args:
chunk_size: <= number of bytes to return for each iteration of the generator.
Yields:
tarball in size/chunk_size chunks
Raises:
NotFound: when container has been removed from service
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/export", stream=True)
response.raise_for_status()
yield from response.iter_content(chunk_size=chunk_size)
def get_archive(
self, path: str, chunk_size: int = api.DEFAULT_CHUNK_SIZE
) -> tuple[Iterable, dict[str, Any]]:
"""Download a file or folder from the container's filesystem.
Args:
path: Path to file or folder.
chunk_size: <= number of bytes to return for each iteration of the generator.
Returns:
First item is a raw tar data stream.
Second item is a dict containing os.stat() information on the specified path.
"""
response = self.client.get(f"/containers/{self.id}/archive", params={"path": [path]})
response.raise_for_status()
stat = response.headers.get("x-docker-container-path-stat", None)
stat = api.decode_header(stat)
return response.iter_content(chunk_size=chunk_size), stat
def init(self) -> None:
"""Initialize the container."""
response = self.client.post(f"/containers/{self.id}/init")
response.raise_for_status()
def inspect(self) -> dict:
"""Inspect a container.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/json")
response.raise_for_status()
return response.json()
def kill(self, signal: Union[str, int, None] = None) -> None:
"""Send signal to container.
Raises:
APIError: when service reports an error
"""
response = self.client.post(f"/containers/{self.id}/kill", params={"signal": signal})
response.raise_for_status()
def logs(self, **kwargs) -> Union[bytes, Iterator[bytes]]:
"""Get logs from the container.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return generator of strings as the response. Default: False
timestamps (bool): Show timestamps in output. Default: False
tail (Union[str, int]): Output specified number of lines at the end of
logs. Integer representing the number of lines to display, or the string all.
Default: all
since (Union[datetime, int]): Show logs since a given datetime or
integer epoch (in seconds)
follow (bool): Follow log output. Default: False
until (Union[datetime, int]): Show logs that occurred before the given
datetime or integer epoch (in seconds)
"""
stream = bool(kwargs.get("stream", False))
params = {
"follow": kwargs.get("follow", kwargs.get("stream", None)),
"since": api.prepare_timestamp(kwargs.get("since")),
"stderr": kwargs.get("stderr", True),
"stdout": kwargs.get("stdout", True),
"tail": kwargs.get("tail"),
"timestamps": kwargs.get("timestamps"),
"until": api.prepare_timestamp(kwargs.get("until")),
}
response = self.client.get(f"/containers/{self.id}/logs", stream=stream, params=params)
response.raise_for_status()
if stream:
return api.stream_frames(response)
return api.frames(response)
def pause(self) -> None:
"""Pause processes within the container."""
response = self.client.post(f"/containers/{self.id}/pause")
response.raise_for_status()
def put_archive(self, path: str, data: bytes = None) -> bool:
"""Upload tar archive containing a file or folder to be written into container.
Args:
path: File to write data into
data: Contents to write to file, when None path will be read on client to
build tarfile.
Returns:
True when successful
Raises:
APIError: when server reports error
"""
if path is None:
raise ValueError("'path' is a required argument.")
if data is None:
data = api.create_tar("/", path)
response = self.client.put(
f"/containers/{self.id}/archive", params={"path": path}, data=data
)
return response.ok
def remove(self, **kwargs) -> None:
"""Delete container.
Keyword Args:
v (bool): Delete associated volumes as well.
link (bool): Ignored.
force (bool): Kill a running container before deleting.
"""
self.manager.remove(self.id, **kwargs)
def rename(self, name: str) -> None:
"""Rename container.
Container updated in-situ to avoid reload().
Args:
name: New name for container.
"""
if not name:
raise ValueError("'name' is a required argument.")
response = self.client.post(f"/containers/{self.id}/rename", params={"name": name})
response.raise_for_status()
self.attrs["Name"] = name # shortcut to avoid needing reload()
def resize(self, height: int = None, width: int = None) -> None:
"""Resize the tty session.
Args:
height: New height of tty session.
width: New width of tty session.
"""
params = {
"h": height,
"w": width,
}
response = self.client.post(f"/containers/{self.id}/resize", params=params)
response.raise_for_status()
def restart(self, **kwargs) -> None:
"""Restart processes in container.
Keyword Args:
timeout (int): Seconds to wait for container to stop before killing container.
"""
params = {"timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/restart", params=params, **post_kwargs)
response.raise_for_status()
def start(self, **kwargs) -> None:
"""Start processes in container.
Keyword Args:
detach_keys: Override the key sequence for detaching a container (Podman only)
"""
response = self.client.post(
f"/containers/{self.id}/start", params={"detachKeys": kwargs.get("detach_keys")}
)
response.raise_for_status()
def stats(
self, **kwargs
) -> Union[bytes, dict[str, Any], Iterator[bytes], Iterator[dict[str, Any]]]:
"""Return statistics for container.
Keyword Args:
decode (bool): If True and stream is True, stream will be decoded into dict's.
Default: False.
stream (bool): Stream statistics until cancelled. Default: True.
Raises:
APIError: when service reports an error
"""
# FIXME Errors in stream are not handled, need content and json to read Errors.
stream = kwargs.get("stream", True)
decode = kwargs.get("decode", False)
params = {
"containers": self.id,
"stream": stream,
}
response = self.client.get("/containers/stats", params=params, stream=stream)
response.raise_for_status()
if stream:
return api.stream_helper(response, decode_to_json=decode)
return json.loads(response.content) if decode else response.content
def stop(self, **kwargs) -> None:
"""Stop container.
Keyword Args:
all (bool): When True, stop all containers. Default: False (Podman only)
ignore (bool): When True, ignore error if container already stopped (Podman only)
timeout (int): Number of seconds to wait on container to stop before killing it.
"""
params = {"all": kwargs.get("all"), "timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/stop", params=params, **post_kwargs)
response.raise_for_status()
if response.status_code == requests.codes.no_content:
return
if response.status_code == requests.codes.not_modified:
if kwargs.get("ignore", False):
return
body = response.json()
raise APIError(body["cause"], response=response, explanation=body["message"])
def top(self, **kwargs) -> Union[Iterator[dict[str, Any]], dict[str, Any]]:
"""Report on running processes in the container.
Keyword Args:
ps_args (str): When given, arguments will be passed to ps
stream (bool): When True, repeatedly return results. Default: False
Raises:
NotFound: when the container no longer exists
APIError: when the service reports an error
"""
stream = kwargs.get("stream", False)
params = {
"stream": stream,
"ps_args": kwargs.get("ps_args"),
}
response = self.client.get(f"/containers/{self.id}/top", params=params, stream=stream)
response.raise_for_status()
if stream:
return api.stream_helper(response, decode_to_json=True)
return response.json()
def unpause(self) -> None:
"""Unpause processes in container."""
response = self.client.post(f"/containers/{self.id}/unpause")
response.raise_for_status()
def update(self, **kwargs):
"""Update resource configuration of the containers.
Raises:
NotImplementedError: Podman service unsupported operation.
"""
raise NotImplementedError("Container.update() is not supported by Podman service.")
def wait(self, **kwargs) -> int:
"""Block until the container enters given state.
Keyword Args:
condition (Union[str, list[str]]): Container state on which to release.
One or more of: "configured", "created", "running", "stopped",
"paused", "exited", "removing", "stopping".
interval (int): Time interval to wait before polling for completion.
Returns:
"Error" key has a dictionary value with the key "Message".
Raises:
NotFound: when Container not found
ReadTimeoutError: when timeout is exceeded
APIError: when service returns an error
"""
condition = kwargs.get("condition")
if isinstance(condition, str):
condition = [condition]
interval = kwargs.get("interval")
params = {}
if condition != []:
params["condition"] = condition
if interval != "":
params["interval"] = interval
# This API endpoint responds with a JSON encoded integer.
# See:
# https://docs.podman.io/en/latest/_static/api.html#tag/containers/operation/ContainerWaitLibpod
response = self.client.post(f"/containers/{self.id}/wait", params=params)
response.raise_for_status()
return response.json()
|