1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
"""
Process and analyze your data with tasks in the InfluxDB task engine.
Use tasks (scheduled Flux queries) to input a data stream and then analyze, modify, and act on the data accordingly.
"""
import datetime
from typing import List
from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \
AddResourceMemberRequestBody, RunManually, Run, LogEvent
class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after
@staticmethod
def empty():
return _Page([], False, None)
@staticmethod
def initial(after):
return _Page([], True, after)
class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page
def __iter__(self):
return self
def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)
class TasksApi(object):
"""Implementation for '/api/v2/tasks' endpoint."""
def __init__(self, influxdb_client):
"""Initialize defaults."""
self._influxdb_client = influxdb_client
self._service = TasksService(influxdb_client.api_client)
def find_task_by_id(self, task_id) -> Task:
"""Retrieve a task."""
task = self._service.get_tasks_id(task_id)
return task
def find_tasks(self, **kwargs):
"""List all tasks up to set limit (max 500).
:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks to return
:return: Tasks
"""
return self._service.get_tasks(**kwargs).tasks
def find_tasks_iter(self, **kwargs):
"""Iterate over all tasks with pagination.
:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks in one page
:return: Tasks iterator
"""
def get_next_page(page: _Page):
return self._find_tasks_next_page(page, **kwargs)
return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))
def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
"""Create a new task."""
if task_create_request is not None:
return self._service.post_tasks(task_create_request)
if task is not None:
request = TaskCreateRequest(flux=task.flux, org_id=task.org_id, org=task.org, description=task.description,
status=task.status)
return self.create_task(task_create_request=request)
raise ValueError("task or task_create_request must be not None")
@staticmethod
def _create_task(name: str, flux: str, every, cron, org_id: str) -> Task:
task = Task(id=0, name=name, org_id=org_id, status="active", flux=flux)
repetition = ""
if every is not None:
repetition += "every: "
repetition += every
if cron is not None:
repetition += "cron: "
repetition += '"' + cron + '"'
flux_with_options = '{} \n\noption task = {{name: "{}", {}}}'.format(flux, name, repetition)
task.flux = flux_with_options
return task
def create_task_every(self, name, flux, every, organization) -> Task:
"""Create a new task with every repetition schedule."""
task = self._create_task(name, flux, every, None, organization.id)
return self.create_task(task)
def create_task_cron(self, name: str, flux: str, cron: str, org_id: str) -> Task:
"""Create a new task with cron repetition schedule."""
task = self._create_task(name=name, flux=flux, cron=cron, org_id=org_id, every=None)
return self.create_task(task)
def delete_task(self, task_id: str):
"""Delete a task."""
if task_id is not None:
return self._service.delete_tasks_id(task_id=task_id)
def update_task(self, task: Task) -> Task:
"""Update a task."""
req = TaskUpdateRequest(flux=task.flux, description=task.description, every=task.every, cron=task.cron,
status=task.status, offset=task.offset)
return self.update_task_request(task_id=task.id, task_update_request=req)
def update_task_request(self, task_id, task_update_request: TaskUpdateRequest) -> Task:
"""Update a task."""
return self._service.patch_tasks_id(task_id=task_id, task_update_request=task_update_request)
def clone_task(self, task: Task) -> Task:
"""Clone a task."""
cloned = Task(id=0, name=task.name, org_id=task.org_id, org=task.org, flux=task.flux, status="active")
created = self.create_task(cloned)
if task.id:
labels = self.get_labels(task.id)
for label in labels.labels:
self.add_label(label.id, created.id)
return created
def get_labels(self, task_id):
"""List all labels for a task."""
return self._service.get_tasks_id_labels(task_id=task_id)
def add_label(self, label_id: str, task_id: str) -> LabelResponse:
"""Add a label to a task."""
label_mapping = LabelMapping(label_id=label_id)
return self._service.post_tasks_id_labels(task_id=task_id, label_mapping=label_mapping)
def delete_label(self, label_id: str, task_id: str):
"""Delete a label from a task."""
return self._service.delete_tasks_id_labels_id(task_id=task_id, label_id=label_id)
def get_members(self, task_id: str):
"""List all task members."""
return self._service.get_tasks_id_members(task_id=task_id).users
def add_member(self, member_id, task_id):
"""Add a member to a task."""
user = AddResourceMemberRequestBody(id=member_id)
return self._service.post_tasks_id_members(task_id=task_id, add_resource_member_request_body=user)
def delete_member(self, member_id, task_id):
"""Remove a member from a task."""
return self._service.delete_tasks_id_members_id(user_id=member_id, task_id=task_id)
def get_owners(self, task_id):
"""List all owners of a task."""
return self._service.get_tasks_id_owners(task_id=task_id).users
def add_owner(self, owner_id, task_id):
"""Add an owner to a task."""
user = AddResourceMemberRequestBody(id=owner_id)
return self._service.post_tasks_id_owners(task_id=task_id, add_resource_member_request_body=user)
def delete_owner(self, owner_id, task_id):
"""Remove an owner from a task."""
return self._service.delete_tasks_id_owners_id(user_id=owner_id, task_id=task_id)
def get_runs(self, task_id, **kwargs) -> List['Run']:
"""
Retrieve list of run records for a task.
:param task_id: task id
:key str after: returns runs after specified ID
:key int limit: the number of runs to return
:key datetime after_time: filter runs to those scheduled after this time, RFC3339
:key datetime before_time: filter runs to those scheduled before this time, RFC3339
"""
return self._service.get_tasks_id_runs(task_id=task_id, **kwargs).runs
def get_run(self, task_id: str, run_id: str) -> Run:
"""
Get run record for specific task and run id.
:param task_id: task id
:param run_id: run id
:return: Run for specified task and run id
"""
return self._service.get_tasks_id_runs_id(task_id=task_id, run_id=run_id)
def get_run_logs(self, task_id: str, run_id: str) -> List['LogEvent']:
"""Retrieve all logs for a run."""
return self._service.get_tasks_id_runs_id_logs(task_id=task_id, run_id=run_id).events
def run_manually(self, task_id: str, scheduled_for: datetime = None):
"""
Manually start a run of the task now overriding the current schedule.
:param task_id:
:param scheduled_for: planned execution
"""
r = RunManually(scheduled_for=scheduled_for)
return self._service.post_tasks_id_runs(task_id=task_id, run_manually=r)
def retry_run(self, task_id: str, run_id: str):
"""
Retry a task run.
:param task_id: task id
:param run_id: run id
"""
return self._service.post_tasks_id_runs_id_retry(task_id=task_id, run_id=run_id)
def cancel_run(self, task_id: str, run_id: str):
"""
Cancel a currently running run.
:param task_id:
:param run_id:
"""
return self._service.delete_tasks_id_runs_id(task_id=task_id, run_id=run_id)
def get_logs(self, task_id: str) -> List['LogEvent']:
"""
Retrieve all logs for a task.
:param task_id: task id
"""
return self._service.get_tasks_id_logs(task_id=task_id).events
def find_tasks_by_user(self, task_user_id):
"""List all tasks by user."""
return self.find_tasks(user=task_user_id)
def _find_tasks_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()
args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
tasks_response = self._service.get_tasks(**args)
tasks = tasks_response.tasks
has_next = tasks_response.links.next is not None
last_id = tasks[-1].id if tasks else None
return _Page(tasks, has_next, last_id)
|