
#
# This program is a stand-in for good intro docs. It just documents various
# basics of using Mitogen.
#

from __future__ import absolute_import
from __future__ import print_function

import hashlib
import io
import os

import mitogen.core
import mitogen.master
import mitogen.service
import mitogen.utils



def get_file_contents(path):
    """
    Get the contents of a file.
    """
    with open(path, 'rb') as fp:
        # mitogen.core.Blob() is a bytes subclass with a repr() that returns a
        # summary of the blob, rather than the raw blob data. This makes
        # logging output *much* nicer. Unlike most custom types, blobs can be
        # serialized.
        return mitogen.core.Blob(fp.read())


def put_file_contents(path, s):
    """
    Write the contents of a file.
    """
    with open(path, 'wb') as fp:
        fp.write(s)


def streamy_download_file(context, path):
    """
    Fetch a file from the FileService hosted by `context`.
    """
    bio = io.BytesIO()

    # FileService.get() is not actually an exposed service method, it's just a
    # classmethod that wraps up the complicated dance of implementing the
    # transfer.
    ok, metadata = mitogen.service.FileService.get(context, path, bio)

    return {
        'success': ok,
        'metadata': metadata,
        'size': len(bio.getvalue()),
    }


def md5sum(path):
    """
    Return the MD5 checksum for a file.
    """
    return hashlib.md5(get_file_contents(path)).hexdigest()



def work_on_machine(context):
    """
    Do stuff to a remote context.
    """
    print("Created context. Context ID is", context.context_id)

    # You don't need to understand any/all of this, but it's helpful to grok
    # the whole chain:

    # - Context.call() is a light wrapper around .call_async(), the wrapper
    #   simply blocks the caller until a reply arrives.
    # - .call_async() serializes the call signature into a message and passes
    #   it to .send_async()
    # - .send_async() creates a mitogen.core.Receiver() on the local router.
    #   The receiver constructor uses Router.add_handle() to allocate a
    #   'reply_to' handle and install a callback function that wakes the
    #   receiver when a reply message arrives.
    # - .send_async() puts the reply handle in Message.reply_to field and
    #   passes it to .send()
    # - Context.send() stamps the destination context ID into the
    #   Message.dst_id field and passes it to Router.route()
    # - Router.route() uses Broker.defer() to schedule _async_route(msg)
    #   on the Broker thread.
    # [broker thread]
    # - The broker thread wakes and calls _async_route(msg)
    # - Router._async_route() notices 'dst_id' is for a remote context and
    #   looks up the stream on which messages for dst_id should be sent (may be
    #   direct connection or not), and calls Stream.send()
    # - Stream.send() packs the message into a bytestring, appends it to
    #   Stream._output_buf, and calls Broker.start_transmit()
    # - Broker finishes work, reenters IO loop. IO loop wakes due to writeable
    #   stream.
    # - Stream.on_transmit() writes the full/partial buffer to SSH, calls
    #   stop_transmit() to mark the stream unwriteable once _output_buf is
    #   empty.
    # - Broker IO loop sleeps, no readers/writers.
    # - Broker wakes due to SSH stream readable.
    # - Stream.on_receive() called, reads the reply message, converts it to a
    #   Message and passes it to Router._async_route().
    # - Router._async_route() notices message is for local context, looks up
    #   target handle in the .add_handle() registry.
    # - Receiver._on_receive() called, appends message to receiver queue.
    # [main thread]
    # - Receiver.get() used to block the original Context.call() wakes and pops
    #   the message from the queue.
    # - Message data (pickled return value) is deserialized and returned to the
    #   caller.
    print("It's running on the local machine. Its PID is",
          context.call(os.getpid))

    # Now let's call a function defined in this module. On receiving the
    # function call request, the child attempts to import __main__, which is
    # initially missing, causing the importer in the child to request it from
    # its parent. That causes _this script_ to be sent as the module source
    # over the wire.
    print("Calling md5sum(/etc/passwd) in the child:",
          context.call(md5sum, '/etc/passwd'))

    # Now let's "transfer" a file. The simplest way to do this is calling a
    # function that returns the file data, which is totally fine for small
    # files.
    print("Download /etc/passwd via function call: %d bytes" % (
        len(context.call(get_file_contents, '/etc/passwd'))
    ))

    # And using function calls, in the other direction:
    print("Upload /tmp/blah via function call: %s" % (
        context.call(put_file_contents, '/tmp/blah', b'blah!'),
    ))

    # Now lets transfer what might be a big files. The problem with big files
    # is that they may not fit in RAM. This uses mitogen.services.FileService
    # to implement streamy file transfer instead. The sender must have a
    # 'service pool' running that will host FileService. First let's do the
    # 'upload' direction, where the master hosts FileService.

    # Steals the 'Router' reference from the context object. In a real app the
    # pool would be constructed once at startup, this is just demo code.
    file_service = mitogen.service.FileService(context.router)

    # Start the pool.
    pool = mitogen.service.Pool(context.router, services=[file_service])

    # Grant access to a file on the local disk from unprivileged contexts.
    # .register() is also exposed as a service method -- you can call it on a
    # child context from any more privileged context.
    file_service.register('/etc/passwd')

    # Now call our wrapper function that knows how to handle the transfer. In a
    # real app, this wrapper might also set ownership/modes or do any other
    # app-specific stuff relating to the file that was transferred.
    print("Streamy upload /etc/passwd: remote result: %s" % (
        context.call(
            streamy_download_file,
            # To avoid hard-wiring streamy_download_file(), we want to pass it
            # a Context object that hosts the file service it should request
            # files from. Router.myself() returns a Context referring to this
            # process.
            context=router.myself(),
            path='/etc/passwd',
        ),
    ))

    # Shut down the pool now we're done with it, else app will hang at exit.
    # Once again, this should only happen once at app startup/exit, not for
    # every file transfer!
    pool.stop(join=True)

    # Now let's do the same thing but in reverse: we use FileService on the
    # remote download a file. This uses context.call_service(), which invokes a
    # special code path that causes auto-initialization of a thread pool in the
    # target, and auto-construction of the target service, but only if the
    # service call was made by a more privileged context. We could write a
    # helper function that runs in the remote to do all that by hand, but the
    # library handles it for us.

    # Make the file accessible. A future FileService could avoid the need for
    # this for privileged contexts.
    context.call_service(
        service_name=mitogen.service.FileService,
        method_name='register',
        path='/etc/passwd'
    )

    # Now we can use our streamy_download_file() function in reverse -- running
    # it from this process and having it fetch from the remote process:
    print("Streamy download /etc/passwd: result: %s" % (
        streamy_download_file(context, '/etc/passwd'),
    ))


def main():
    # Setup logging. Mitogen produces a LOT of logging. Over the course of the
    # stable series, Mitogen's loggers will be carved up so more selective /
    # user-friendly logging is possible. mitogen.log_to_file() just sets up
    # something basic, defaulting to INFO level, but you can override from the
    # command-line by passing MITOGEN_LOG_LEVEL=debug or MITOGEN_LOG_LEVEL=io.
    # IO logging is sometimes useful for hangs, but it is often creates more
    # confusion than it solves.
    mitogen.utils.log_to_file()

    # Construct the Broker thread. It manages an async IO loop listening for
    # reads from any active connection, or wakes from any non-Broker thread.
    # Because Mitogen uses a background worker thread, it is extremely
    # important to pay attention to the use of UNIX fork in your code --
    # forking entails making a snapshot of the state of all locks in the
    # program, including those in the logging module, and thus can create code
    # that appears to work for a long time, before deadlocking randomly.
    # Forking in a Mitogen app requires significant upfront planning!
    broker = mitogen.master.Broker()

    # Construct a Router. This accepts messages (mitogen.core.Message) and
    # either dispatches locally addressed messages to local handlers (added via
    # Router.add_handle()) on the broker thread, or forwards the message
    # towards the target context.

    # The router also acts as an uglyish God object for creating new
    # connections. This was a design mistake, really those methods should be
    # directly imported from e.g. 'mitogen.ssh'.
    router = mitogen.master.Router(broker)

    # Router can act like a context manager. It simply ensures
    # Broker.shutdown() is called on exception / exit. That prevents the app
    # hanging due to a forgotten background thread. For throwaway scripts,
    # there are also decorator versions "@mitogen.main()" and
    # "@mitogen.utils.with_router" that do the same thing with less typing.
    with router:
        # Now let's construct a context. The '.local()' constructor just creates
        # the context as a subprocess, the simplest possible case.
        child = router.local()
        print("Created a context:", child)
        print()

        # This demonstrates the standard IO redirection. We call the print
        # function in the remote context, that should cause a log message to be
        # emitted. Any subprocesses started by the remote also get the same
        # treatment, so it's very easy to spot otherwise discarded errors/etc.
        # from remote tools.
        child.call(print, "Hello from child.")

        # Context objects make it semi-convenient to treat the local machine the
        # same as a remote machine.
        work_on_machine(child)

        # Now let's construct a proxied context. We'll simply use the .local()
        # constructor again, but construct it via 'child'. In effect we are
        # constructing a sub-sub-process. Instead of .local() here, we could
        # have used .sudo() or .ssh() or anything else.
        subchild = router.local(via=child)
        print()
        print()
        print()
        print("Created a context as a child of another context:", subchild)

        # Do everything again with the new child.
        work_on_machine(subchild)

        # We can selectively shut down individual children if we want:
        subchild.shutdown(wait=True)

        # Or we can simply fall off the end of the scope, effectively calling
        # Broker.shutdown(), which causes all children to die as part of
        # shutdown.


# The child module importer detects the execution guard below and removes any
# code appearing after it, and refuses to execute "__main__" if it is absent.
# This is necessary to prevent a common problem where people try to call
# functions defined in __main__ without first wrapping it up to be importable
# as a module, which previously hung the target, or caused bizarre recursive
# script runs.
if __name__ == '__main__':
    main()
