# -*- coding: utf-8 -*-

#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import contextlib
import itertools
import logging
import os
import shutil
import socket
import sys
import tempfile
import threading
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from oslo_utils import timeutils
from oslo_utils import uuidutils
from zake import fake_client

from taskflow.conductors import backends as conductors
from taskflow import engines
from taskflow.jobs import backends as boards
from taskflow.patterns import linear_flow
from taskflow.persistence import backends as persistence
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import threading_utils

# INTRO: This examples shows how a worker/producer can post desired work (jobs)
# to a jobboard and a conductor can consume that work (jobs) from that jobboard
# and execute those jobs in a reliable & async manner (for example, if the
# conductor were to crash then the job will be released back onto the jobboard
# and another conductor can attempt to finish it, from wherever that job last
# left off).
#
# In this example a in-memory jobboard (and in-memory storage) is created and
# used that simulates how this would be done at a larger scale (it is an
# example after all).

# Restrict how long this example runs for...
RUN_TIME = 5
REVIEW_CREATION_DELAY = 0.5
SCAN_DELAY = 0.1
NAME = "%s_%s" % (socket.getfqdn(), os.getpid())

# This won't really use zookeeper but will use a local version of it using
# the zake library that mimics an actual zookeeper cluster using threads and
# an in-memory data structure.
JOBBOARD_CONF = {
    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
}


class RunReview(task.Task):
    # A dummy task that clones the review and runs tox...

    def _clone_review(self, review, temp_dir):
        print("Cloning review '%s' into %s" % (review['id'], temp_dir))

    def _run_tox(self, temp_dir):
        print("Running tox in %s" % temp_dir)

    def execute(self, review, temp_dir):
        self._clone_review(review, temp_dir)
        self._run_tox(temp_dir)


class MakeTempDir(task.Task):
    # A task that creates and destroys a temporary dir (on failure).
    #
    # It provides the location of the temporary dir for other tasks to use
    # as they see fit.

    default_provides = 'temp_dir'

    def execute(self):
        return tempfile.mkdtemp()

    def revert(self, *args, **kwargs):
        temp_dir = kwargs.get(task.REVERT_RESULT)
        if temp_dir:
            shutil.rmtree(temp_dir)


class CleanResources(task.Task):
    # A task that cleans up any workflow resources.

    def execute(self, temp_dir):
        print("Removing %s" % temp_dir)
        shutil.rmtree(temp_dir)


def review_iter():
    """Makes reviews (never-ending iterator/generator)."""
    review_id_gen = itertools.count(0)
    while True:
        review_id = next(review_id_gen)
        review = {
            'id': review_id,
        }
        yield review


# The reason this is at the module namespace level is important, since it must
# be accessible from a conductor dispatching an engine, if it was a lambda
# function for example, it would not be reimportable and the conductor would
# be unable to reference it when creating the workflow to run.
def create_review_workflow():
    """Factory method used to create a review workflow to run."""
    f = linear_flow.Flow("tester")
    f.add(
        MakeTempDir(name="maker"),
        RunReview(name="runner"),
        CleanResources(name="cleaner")
    )
    return f


def generate_reviewer(client, saver, name=NAME):
    """Creates a review producer thread with the given name prefix."""
    real_name = "%s_reviewer" % name
    no_more = threading.Event()
    jb = boards.fetch(real_name, JOBBOARD_CONF,
                      client=client, persistence=saver)

    def make_save_book(saver, review_id):
        # Record what we want to happen (sometime in the future).
        book = models.LogBook("book_%s" % review_id)
        detail = models.FlowDetail("flow_%s" % review_id,
                                   uuidutils.generate_uuid())
        book.add(detail)
        # Associate the factory method we want to be called (in the future)
        # with the book, so that the conductor will be able to call into
        # that factory to retrieve the workflow objects that represent the
        # work.
        #
        # These args and kwargs *can* be used to save any specific parameters
        # into the factory when it is being called to create the workflow
        # objects (typically used to tell a factory how to create a unique
        # workflow that represents this review).
        factory_args = ()
        factory_kwargs = {}
        engines.save_factory_details(detail, create_review_workflow,
                                     factory_args, factory_kwargs)
        with contextlib.closing(saver.get_connection()) as conn:
            conn.save_logbook(book)
            return book

    def run():
        """Periodically publishes 'fake' reviews to analyze."""
        jb.connect()
        review_generator = review_iter()
        with contextlib.closing(jb):
            while not no_more.is_set():
                review = next(review_generator)
                details = {
                    'store': {
                        'review': review,
                    },
                }
                job_name = "%s_%s" % (real_name, review['id'])
                print("Posting review '%s'" % review['id'])
                jb.post(job_name,
                        book=make_save_book(saver, review['id']),
                        details=details)
                time.sleep(REVIEW_CREATION_DELAY)

    # Return the unstarted thread, and a callback that can be used
    # shutdown that thread (to avoid running forever).
    return (threading_utils.daemon_thread(target=run), no_more.set)


def generate_conductor(client, saver, name=NAME):
    """Creates a conductor thread with the given name prefix."""
    real_name = "%s_conductor" % name
    jb = boards.fetch(name, JOBBOARD_CONF,
                      client=client, persistence=saver)
    conductor = conductors.fetch("blocking", real_name, jb,
                                 engine='parallel', wait_timeout=SCAN_DELAY)

    def run():
        jb.connect()
        with contextlib.closing(jb):
            conductor.run()

    # Return the unstarted thread, and a callback that can be used
    # shutdown that thread (to avoid running forever).
    return (threading_utils.daemon_thread(target=run), conductor.stop)


def main():
    # Need to share the same backend, so that data can be shared...
    persistence_conf = {
        'connection': 'memory',
    }
    saver = persistence.fetch(persistence_conf)
    with contextlib.closing(saver.get_connection()) as conn:
        # This ensures that the needed backend setup/data directories/schema
        # upgrades and so on... exist before they are attempted to be used...
        conn.upgrade()
    fc1 = fake_client.FakeClient()
    # Done like this to share the same client storage location so the correct
    # zookeeper features work across clients...
    fc2 = fake_client.FakeClient(storage=fc1.storage)
    entities = [
        generate_reviewer(fc1, saver),
        generate_conductor(fc2, saver),
    ]
    for t, stopper in entities:
        t.start()
    try:
        watch = timeutils.StopWatch(duration=RUN_TIME)
        watch.start()
        while not watch.expired():
            time.sleep(0.1)
    finally:
        for t, stopper in reversed(entities):
            stopper()
            t.join()


if __name__ == '__main__':
    main()
