File: task.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (35 lines) | stat: -rw-r--r-- 970 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import sys
import time
import asyncio
from .spinner import Spinner
from .color import Color


class Task:
    def __init__(self, title):
        self.running = True
        self.task = asyncio.ensure_future(self.process())
        self.success = False
        self.title = title
        self.start = time.time()

    def stop(self, success):
        self.running = False
        self.success = success
        self.duration = time.time() - self.start

    async def process(self):
        spinner = Spinner()
        while self.running:
            sys.stdout.write(f'{self.title:.<76}{spinner.next}\r')
            sys.stdout.flush()
            await asyncio.sleep(0.2)

        if self.success:
            print(
                f'{self.title:.<76}'
                f'{Color.success("OK")} ({self.duration:.2f} seconds)')
        else:
            print(
                f'{self.title:.<76}'
                f'{Color.error("FAILED")} ({self.duration:.2f} seconds)')