#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import fractions
import functools
import logging
import os
import string
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow import exceptions
from taskflow.patterns import linear_flow
from taskflow import task


# In this example we show how a simple linear set of tasks can be executed
# using local processes (and not threads or remote workers) with minimal (if
# any) modification to those tasks to make them safe to run in this mode.
#
# This is useful since it allows further scaling up your workflows when thread
# execution starts to become a bottleneck (which it can start to be due to the
# GIL in python). It also offers a intermediary scalable runner that can be
# used when the scale and/or setup of remote workers is not desirable.


def progress_printer(task, event_type, details):
    # This callback, attached to each task will be called in the local
    # process (not the child processes)...
    progress = details.pop('progress')
    progress = int(progress * 100.0)
    print("Task '%s' reached %d%% completion" % (task.name, progress))


class AlphabetTask(task.Task):
    # Second delay between each progress part.
    _DELAY = 0.1

    # This task will run in X main stages (each with a different progress
    # report that will be delivered back to the running process...). The
    # initial 0% and 100% are triggered automatically by the engine when
    # a task is started and finished (so that's why those are not emitted
    # here).
    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]

    def execute(self):
        for p in self._PROGRESS_PARTS:
            self.update_progress(p)
            time.sleep(self._DELAY)


print("Constructing...")
soup = linear_flow.Flow("alphabet-soup")
for letter in string.ascii_lowercase:
    abc = AlphabetTask(letter)
    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
                          functools.partial(progress_printer, abc))
    soup.add(abc)
try:
    print("Loading...")
    e = engines.load(soup, engine='parallel', executor='processes')
    print("Compiling...")
    e.compile()
    print("Preparing...")
    e.prepare()
    print("Running...")
    e.run()
    print("Done: %s" % e.statistics)
except exceptions.NotImplementedError as e:
    print(e)
