1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
import datetime
from collections import deque
from typing import Final, Optional
from moto.stepfunctions.parser.api import (
ActivityListItem,
Arn,
DescribeActivityOutput,
Name,
Timestamp,
)
class ActivityTask:
task_input: Final[str]
task_token: Final[str]
def __init__(self, task_token: str, task_input: str):
self.task_token = task_token
self.task_input = task_input
class Activity:
arn: Final[Arn]
name: Final[Name]
creation_date: Final[Timestamp]
_tasks: Final[deque[ActivityTask]]
def __init__(self, arn: Arn, name: Name, creation_date: Optional[Timestamp] = None):
self.arn = arn
self.name = name
self.creation_date = creation_date or datetime.datetime.now(
tz=datetime.timezone.utc
)
self._tasks = deque()
def add_task(self, task: ActivityTask):
self._tasks.append(task)
def get_task(self) -> Optional[ActivityTask]:
return self._tasks.popleft()
def to_describe_activity_output(self) -> DescribeActivityOutput:
return DescribeActivityOutput(
activityArn=self.arn, name=self.name, creationDate=self.creation_date
)
def to_activity_list_item(self) -> ActivityListItem:
return ActivityListItem(
activityArn=self.arn, name=self.name, creationDate=self.creation_date
)
|