File: the_basics.py

package info (click to toggle)
python-mitogen 0.3.25~a2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,220 kB
  • sloc: python: 21,989; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (279 lines) | stat: -rw-r--r-- 11,868 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279

#
# This program is a stand-in for good intro docs. It just documents various
# basics of using Mitogen.
#

from __future__ import absolute_import
from __future__ import print_function

import hashlib
import io
import os

import mitogen.core
import mitogen.master
import mitogen.service
import mitogen.utils



def get_file_contents(path):
    """
    Get the contents of a file.
    """
    with open(path, 'rb') as fp:
        # mitogen.core.Blob() is a bytes subclass with a repr() that returns a
        # summary of the blob, rather than the raw blob data. This makes
        # logging output *much* nicer. Unlike most custom types, blobs can be
        # serialized.
        return mitogen.core.Blob(fp.read())


def put_file_contents(path, s):
    """
    Write the contents of a file.
    """
    with open(path, 'wb') as fp:
        fp.write(s)


def streamy_download_file(context, path):
    """
    Fetch a file from the FileService hosted by `context`.
    """
    bio = io.BytesIO()

    # FileService.get() is not actually an exposed service method, it's just a
    # classmethod that wraps up the complicated dance of implementing the
    # transfer.
    ok, metadata = mitogen.service.FileService.get(context, path, bio)

    return {
        'success': ok,
        'metadata': metadata,
        'size': len(bio.getvalue()),
    }


def md5sum(path):
    """
    Return the MD5 checksum for a file.
    """
    return hashlib.md5(get_file_contents(path)).hexdigest()



def work_on_machine(context):
    """
    Do stuff to a remote context.
    """
    print("Created context. Context ID is", context.context_id)

    # You don't need to understand any/all of this, but it's helpful to grok
    # the whole chain:

    # - Context.call() is a light wrapper around .call_async(), the wrapper
    #   simply blocks the caller until a reply arrives.
    # - .call_async() serializes the call signature into a message and passes
    #   it to .send_async()
    # - .send_async() creates a mitogen.core.Receiver() on the local router.
    #   The receiver constructor uses Router.add_handle() to allocate a
    #   'reply_to' handle and install a callback function that wakes the
    #   receiver when a reply message arrives.
    # - .send_async() puts the reply handle in Message.reply_to field and
    #   passes it to .send()
    # - Context.send() stamps the destination context ID into the
    #   Message.dst_id field and passes it to Router.route()
    # - Router.route() uses Broker.defer() to schedule _async_route(msg)
    #   on the Broker thread.
    # [broker thread]
    # - The broker thread wakes and calls _async_route(msg)
    # - Router._async_route() notices 'dst_id' is for a remote context and
    #   looks up the stream on which messages for dst_id should be sent (may be
    #   direct connection or not), and calls Stream.send()
    # - Stream.send() packs the message into a bytestring, appends it to
    #   Stream._output_buf, and calls Broker.start_transmit()
    # - Broker finishes work, reenters IO loop. IO loop wakes due to writeable
    #   stream.
    # - Stream.on_transmit() writes the full/partial buffer to SSH, calls
    #   stop_transmit() to mark the stream unwriteable once _output_buf is
    #   empty.
    # - Broker IO loop sleeps, no readers/writers.
    # - Broker wakes due to SSH stream readable.
    # - Stream.on_receive() called, reads the reply message, converts it to a
    #   Message and passes it to Router._async_route().
    # - Router._async_route() notices message is for local context, looks up
    #   target handle in the .add_handle() registry.
    # - Receiver._on_receive() called, appends message to receiver queue.
    # [main thread]
    # - Receiver.get() used to block the original Context.call() wakes and pops
    #   the message from the queue.
    # - Message data (pickled return value) is deserialized and returned to the
    #   caller.
    print("It's running on the local machine. Its PID is",
          context.call(os.getpid))

    # Now let's call a function defined in this module. On receiving the
    # function call request, the child attempts to import __main__, which is
    # initially missing, causing the importer in the child to request it from
    # its parent. That causes _this script_ to be sent as the module source
    # over the wire.
    print("Calling md5sum(/etc/passwd) in the child:",
          context.call(md5sum, '/etc/passwd'))

    # Now let's "transfer" a file. The simplest way to do this is calling a
    # function that returns the file data, which is totally fine for small
    # files.
    print("Download /etc/passwd via function call: %d bytes" % (
        len(context.call(get_file_contents, '/etc/passwd'))
    ))

    # And using function calls, in the other direction:
    print("Upload /tmp/blah via function call: %s" % (
        context.call(put_file_contents, '/tmp/blah', b'blah!'),
    ))

    # Now lets transfer what might be a big files. The problem with big files
    # is that they may not fit in RAM. This uses mitogen.services.FileService
    # to implement streamy file transfer instead. The sender must have a
    # 'service pool' running that will host FileService. First let's do the
    # 'upload' direction, where the master hosts FileService.

    # Steals the 'Router' reference from the context object. In a real app the
    # pool would be constructed once at startup, this is just demo code.
    file_service = mitogen.service.FileService(context.router)

    # Start the pool.
    pool = mitogen.service.Pool(context.router, services=[file_service])

    # Grant access to a file on the local disk from unprivileged contexts.
    # .register() is also exposed as a service method -- you can call it on a
    # child context from any more privileged context.
    file_service.register('/etc/passwd')

    # Now call our wrapper function that knows how to handle the transfer. In a
    # real app, this wrapper might also set ownership/modes or do any other
    # app-specific stuff relating to the file that was transferred.
    print("Streamy upload /etc/passwd: remote result: %s" % (
        context.call(
            streamy_download_file,
            # To avoid hard-wiring streamy_download_file(), we want to pass it
            # a Context object that hosts the file service it should request
            # files from. Router.myself() returns a Context referring to this
            # process.
            context=router.myself(),
            path='/etc/passwd',
        ),
    ))

    # Shut down the pool now we're done with it, else app will hang at exit.
    # Once again, this should only happen once at app startup/exit, not for
    # every file transfer!
    pool.stop(join=True)

    # Now let's do the same thing but in reverse: we use FileService on the
    # remote download a file. This uses context.call_service(), which invokes a
    # special code path that causes auto-initialization of a thread pool in the
    # target, and auto-construction of the target service, but only if the
    # service call was made by a more privileged context. We could write a
    # helper function that runs in the remote to do all that by hand, but the
    # library handles it for us.

    # Make the file accessible. A future FileService could avoid the need for
    # this for privileged contexts.
    context.call_service(
        service_name=mitogen.service.FileService,
        method_name='register',
        path='/etc/passwd'
    )

    # Now we can use our streamy_download_file() function in reverse -- running
    # it from this process and having it fetch from the remote process:
    print("Streamy download /etc/passwd: result: %s" % (
        streamy_download_file(context, '/etc/passwd'),
    ))


def main():
    # Setup logging. Mitogen produces a LOT of logging. Over the course of the
    # stable series, Mitogen's loggers will be carved up so more selective /
    # user-friendly logging is possible. mitogen.log_to_file() just sets up
    # something basic, defaulting to INFO level, but you can override from the
    # command-line by passing MITOGEN_LOG_LEVEL=debug or MITOGEN_LOG_LEVEL=io.
    # IO logging is sometimes useful for hangs, but it is often creates more
    # confusion than it solves.
    mitogen.utils.log_to_file()

    # Construct the Broker thread. It manages an async IO loop listening for
    # reads from any active connection, or wakes from any non-Broker thread.
    # Because Mitogen uses a background worker thread, it is extremely
    # important to pay attention to the use of UNIX fork in your code --
    # forking entails making a snapshot of the state of all locks in the
    # program, including those in the logging module, and thus can create code
    # that appears to work for a long time, before deadlocking randomly.
    # Forking in a Mitogen app requires significant upfront planning!
    broker = mitogen.master.Broker()

    # Construct a Router. This accepts messages (mitogen.core.Message) and
    # either dispatches locally addressed messages to local handlers (added via
    # Router.add_handle()) on the broker thread, or forwards the message
    # towards the target context.

    # The router also acts as an uglyish God object for creating new
    # connections. This was a design mistake, really those methods should be
    # directly imported from e.g. 'mitogen.ssh'.
    router = mitogen.master.Router(broker)

    # Router can act like a context manager. It simply ensures
    # Broker.shutdown() is called on exception / exit. That prevents the app
    # hanging due to a forgotten background thread. For throwaway scripts,
    # there are also decorator versions "@mitogen.main()" and
    # "@mitogen.utils.with_router" that do the same thing with less typing.
    with router:
        # Now let's construct a context. The '.local()' constructor just creates
        # the context as a subprocess, the simplest possible case.
        child = router.local()
        print("Created a context:", child)
        print()

        # This demonstrates the standard IO redirection. We call the print
        # function in the remote context, that should cause a log message to be
        # emitted. Any subprocesses started by the remote also get the same
        # treatment, so it's very easy to spot otherwise discarded errors/etc.
        # from remote tools.
        child.call(print, "Hello from child.")

        # Context objects make it semi-convenient to treat the local machine the
        # same as a remote machine.
        work_on_machine(child)

        # Now let's construct a proxied context. We'll simply use the .local()
        # constructor again, but construct it via 'child'. In effect we are
        # constructing a sub-sub-process. Instead of .local() here, we could
        # have used .sudo() or .ssh() or anything else.
        subchild = router.local(via=child)
        print()
        print()
        print()
        print("Created a context as a child of another context:", subchild)

        # Do everything again with the new child.
        work_on_machine(subchild)

        # We can selectively shut down individual children if we want:
        subchild.shutdown(wait=True)

        # Or we can simply fall off the end of the scope, effectively calling
        # Broker.shutdown(), which causes all children to die as part of
        # shutdown.


# The child module importer detects the execution guard below and removes any
# code appearing after it, and refuses to execute "__main__" if it is absent.
# This is necessary to prevent a common problem where people try to call
# functions defined in __main__ without first wrapping it up to be importable
# as a module, which previously hung the target, or caused bizarre recursive
# script runs.
if __name__ == '__main__':
    main()