#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import os
import string
import sys
import time

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier
from taskflow.utils import threading_utils

ANY = notifier.Notifier.ANY

# INTRO: These examples show how to use a remote worker's event notification
# attribute to proxy back task event notifications to the controlling process.
#
# In this case a simple set of events is triggered by a worker running a
# task (simulated to be remote by using a kombu memory transport and threads).
# Those events that the 'remote worker' produces will then be proxied back to
# the task that the engine is running 'remotely', and then they will be emitted
# back to the original callbacks that exist in the originating engine
# process/thread. This creates a one-way *notification* channel that can
# transparently be used in-process, outside-of-process using remote workers and
# so-on that allows tasks to signal to its controlling process some sort of
# action that has occurred that the task may need to tell others about (for
# example to trigger some type of response when the task reaches 50% done...).


def event_receiver(event_type, details):
    """This is the callback that (in this example) doesn't do much..."""
    print("Recieved event '%s'" % event_type)
    print("Details = %s" % details)


class EventReporter(task.Task):
    """This is the task that will be running 'remotely' (not really remote)."""

    EVENTS = tuple(string.ascii_uppercase)
    EVENT_DELAY = 0.1

    def execute(self):
        for i, e in enumerate(self.EVENTS):
            details = {
                'leftover': self.EVENTS[i:],
            }
            self.notifier.notify(e, details)
            time.sleep(self.EVENT_DELAY)


BASE_SHARED_CONF = {
    'exchange': 'taskflow',
    'transport': 'memory',
    'transport_options': {
        'polling_interval': 0.1,
    },
}

# Until https://github.com/celery/kombu/issues/398 is resolved it is not
# recommended to run many worker threads in this example due to the types
# of errors mentioned in that issue.
MEMORY_WORKERS = 1
WORKER_CONF = {
    'tasks': [
        # Used to locate which tasks we can run (we don't want to allow
        # arbitrary code/tasks to be ran by any worker since that would
        # open up a variety of vulnerabilities).
        '%s:EventReporter' % (__name__),
    ],
}


def run(engine_options):
    reporter = EventReporter()
    reporter.notifier.register(ANY, event_receiver)
    flow = lf.Flow('event-reporter').add(reporter)
    eng = engines.load(flow, engine='worker-based', **engine_options)
    eng.run()


if __name__ == "__main__":
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those objects use it correctly.
    worker_conf = dict(WORKER_CONF)
    worker_conf.update(BASE_SHARED_CONF)
    engine_options = dict(BASE_SHARED_CONF)
    workers = []

    # These topics will be used to request worker information on; those
    # workers will respond with their capabilities which the executing engine
    # will use to match pending tasks to a matched worker, this will cause
    # the task to be sent for execution, and the engine will wait until it
    # is finished (a response is received) and then the engine will either
    # continue with other tasks, do some retry/failure resolution logic or
    # stop (and potentially re-raise the remote workers failure)...
    worker_topics = []

    try:
        # Create a set of worker threads to simulate actual remote workers...
        print('Running %s workers.' % (MEMORY_WORKERS))
        for i in range(0, MEMORY_WORKERS):
            # Give each one its own unique topic name so that they can
            # correctly communicate with the engine (they will all share the
            # same exchange).
            worker_conf['topic'] = 'worker-%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading_utils.daemon_thread(w.run)
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        print('Executing some work.')
        engine_options['topics'] = worker_topics
        result = run(engine_options)
        print('Execution finished.')
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()
