File: parallel_table_multiply.py

package info (click to toggle)
python-taskflow 5.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 3,564 kB
  • sloc: python: 28,241; sh: 269; makefile: 24
file content (128 lines) | stat: -rw-r--r-- 4,276 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-

#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import csv
import logging
import os
import random
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import futurist

from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: This example walks through a miniature workflow which does a parallel
# table modification where each row in the table gets adjusted by a thread, or
# green thread (if eventlet is available) in parallel and then the result
# is reformed into a new table and some verifications are performed on it
# to ensure everything went as expected.


MULTIPLER = 10


class RowMultiplier(task.Task):
    """Performs a modification of an input row, creating a output row."""

    def __init__(self, name, index, row, multiplier):
        super(RowMultiplier, self).__init__(name=name)
        self.index = index
        self.multiplier = multiplier
        self.row = row

    def execute(self):
        return [r * self.multiplier for r in self.row]


def make_flow(table):
    # This creation will allow for parallel computation (since the flow here
    # is specifically unordered; and when things are unordered they have
    # no dependencies and when things have no dependencies they can just be
    # ran at the same time, limited in concurrency by the executor or max
    # workers of that executor...)
    f = uf.Flow("root")
    for i, row in enumerate(table):
        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
    # NOTE(harlowja): at this point nothing has ran, the above is just
    # defining what should be done (but not actually doing it) and associating
    # an ordering dependencies that should be enforced (the flow pattern used
    # forces this), the engine in the later main() function will actually
    # perform this work...
    return f


def main():
    if len(sys.argv) == 2:
        tbl = []
        with open(sys.argv[1], 'rb') as fh:
            reader = csv.reader(fh)
            for row in reader:
                tbl.append([float(r) if r else 0.0 for r in row])
    else:
        # Make some random table out of thin air...
        tbl = []
        cols = random.randint(1, 100)
        rows = random.randint(1, 100)
        for _i in range(0, rows):
            row = []
            for _j in range(0, cols):
                row.append(random.random())
            tbl.append(row)

    # Generate the work to be done.
    f = make_flow(tbl)

    # Now run it (using the specified executor)...
    try:
        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
    except RuntimeError:
        # No eventlet currently active, use real threads instead.
        executor = futurist.ThreadPoolExecutor(max_workers=5)
    try:
        e = engines.load(f, engine='parallel', executor=executor)
        for st in e.run_iter():
            print(st)
    finally:
        executor.shutdown()

    # Find the old rows and put them into place...
    #
    # TODO(harlowja): probably easier just to sort instead of search...
    computed_tbl = []
    for i in range(0, len(tbl)):
        for t in f:
            if t.index == i:
                computed_tbl.append(e.storage.get(t.name))

    # Do some basic validation (which causes the return code of this process
    # to be different if things were not as expected...)
    if len(computed_tbl) != len(tbl):
        return 1
    else:
        return 0


if __name__ == "__main__":
    sys.exit(main())