1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
import datetime
from os import getenv
from time import sleep
from typing import Any, Optional
from moto.batch.exceptions import ClientException
from moto.batch.models import BatchBackend, Job, batch_backends
from moto.core.base_backend import BackendDict
class BatchSimpleBackend(BatchBackend):
"""
Implements a Batch-Backend that does not use Docker containers. Submitted Jobs are marked as Success by default.
Set the environment variable MOTO_SIMPLE_BATCH_FAIL_AFTER=0 to fail jobs immediately, or set this variable to a positive integer to control after how many seconds the job fails.
Annotate your tests with `@mock_aws(config={"batch": {"use_docker": False}})`-decorator to use this Batch-implementation.
"""
@property
def backend(self) -> BatchBackend:
return batch_backends[self.account_id][self.region_name]
def __getattribute__(self, name: str) -> Any:
"""
Magic part that makes this class behave like a wrapper around the regular batch_backend
We intercept calls to `submit_job` and replace this with our own (non-Docker) implementation
Every other method call is send through to batch_backend
"""
if name in [
"backend",
"account_id",
"region_name",
"urls",
"_url_module",
"__class__",
"url_bases",
]:
return object.__getattribute__(self, name)
if name in ["submit_job", "_mark_job_as_finished"]:
def newfunc(*args: Any, **kwargs: Any) -> Any:
attr = object.__getattribute__(self, name)
return attr(*args, **kwargs)
return newfunc
else:
return object.__getattribute__(self.backend, name)
def submit_job(
self,
job_name: str,
job_def_id: str,
job_queue: str,
array_properties: dict[str, Any],
depends_on: Optional[list[dict[str, str]]] = None,
container_overrides: Optional[dict[str, Any]] = None,
timeout: Optional[dict[str, int]] = None,
parameters: Optional[dict[str, str]] = None,
tags: Optional[dict[str, str]] = None,
) -> tuple[str, str, str]:
# Look for job definition
job_def = self.get_job_definition(job_def_id)
if job_def is None:
raise ClientException(f"Job definition {job_def_id} does not exist")
queue = self.get_job_queue(job_queue)
if queue is None:
raise ClientException(f"Job queue {job_queue} does not exist")
job = Job(
job_name,
job_def,
queue,
self,
log_backend=self.logs_backend,
container_overrides=container_overrides,
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
array_properties=array_properties,
parameters=parameters,
tags=tags,
)
if "size" in array_properties:
child_jobs: list[Job] = []
for array_index in range(array_properties.get("size", 0)):
provided_job_id = f"{job.job_id}:{array_index}"
child_job = Job(
job_name,
job_def,
queue,
self,
log_backend=self.logs_backend,
container_overrides=container_overrides,
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
array_properties={"statusSummary": {}, "index": array_index},
provided_job_id=provided_job_id,
parameters=parameters,
)
self._mark_job_as_finished(include_start_attempt=True, job=child_job)
child_jobs.append(child_job)
self._mark_job_as_finished(include_start_attempt=False, job=job)
job._child_jobs = child_jobs
else:
self._mark_job_as_finished(include_start_attempt=True, job=job)
return job_name, job.job_id, job.arn
def _mark_job_as_finished(self, include_start_attempt: bool, job: Job) -> None:
self.backend._jobs[job.job_id] = job
job.job_started_at = datetime.datetime.now()
job.log_stream_name = job._stream_name
if include_start_attempt:
job._start_attempt()
# We don't want to actually run the job - just mark it as succeeded or failed
# depending on whether env var MOTO_SIMPLE_BATCH_FAIL_AFTER is set
# if MOTO_SIMPLE_BATCH_FAIL_AFTER is set to an integer then batch will
# sleep this many seconds
should_batch_fail = getenv("MOTO_SIMPLE_BATCH_FAIL_AFTER")
if should_batch_fail:
try:
batch_fail_delay = int(should_batch_fail)
sleep(batch_fail_delay)
except ValueError:
# Unable to parse value of MOTO_SIMPLE_BATCH_FAIL_AFTER as an integer
pass
# fail the job
job._mark_stopped(success=False)
else:
job._mark_stopped(success=True)
batch_simple_backends = BackendDict(BatchSimpleBackend, "batch")
|