File: simple_builder.md

package info (click to toggle)
python-maggma 0.70.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,416 kB
  • sloc: python: 10,150; makefile: 12
file content (246 lines) | stat: -rw-r--r-- 8,387 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# Writing a Builder

## Builder Architecture

A `Builder` is a class that inherits from `maggma.core.Builder` and implement 3 methods:

* `get_items`:  This method should return some iterable of items to run through `process_items`
* `process_item`: This method should take a single item, process it, and return the processed item
* `update_targets`: This method should take a list of processed items and update the target stores.

To make this less abstract, we will write a builder that multiplies the "a" sub-document by a pre-configured `multiplier`. Let's assume we have some source collection in MongoDB with documents that look like this:

``` json
{
    "id": 1,
    "a": 3,
    "last_updated": "2019-11-3"
}
```

## Class definition and `__init__`

A simple class definition for a Maggma-based builder looks like this:

``` python
from maggma.core import Builder
from maggma.core import Store

class MultiplyBuilder(Builder):
    """
    Simple builder that multiplies the "a" sub-document by pre-set value
    """
```

The `__init__` for a builder can have any set of parameters. Generally, you want a source `Store` and a target `Store` along with any parameters that configure the builder. Due to the `MSONable` pattern, any parameters to `__init__` have to be stored as attributes. A simple `__init__` would look like this:

``` python
    def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
        """
        Arguments:
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        """
        self.source = source
        self.target = target
        self.multiplier = multiplier
        self.kwargs = kwargs

        super().__init__(sources=source,targets=target,**kwargs)
```

Python type annotations provide a really nice way of documenting the types we expect and being able to later type check using `mypy`. We defined the type for `source` and `target` as `Store` since we only care that implements that pattern. How exactly these `Store`s operate doesn't concern us here.

Note that the `__init__` arguments: `source`, `target`, `multiplier`, and `kwargs` get saved as attributes:

``` python
        self.source = source
        self.target = target
        self.multiplier = multiplier
        self.kwargs = kwargs
```

Finally, we want to call the base `Builder`'s `__init__` to tell it our sources and targets for this builder. In addition, we pass along any extra parameters that might configured the base builder class.

``` python
super().__init__(sources=source,targets=target,**kwargs)
```

Calling the parent class `__init__` is a good practice as sub-classing builders is a good way to encapsulate complex logic.

## `get_items`

`get_items` is conceptually a simple method to implement, but in practice can easily be more code than the rest of the builder. All of the logic for getting data from the sources has to happen here, which requires some planning. `get_items` should also sort all of the data into individual **items** to process. This simple builder has a very easy `get_items`:

``` python

    def get_items(self) -> Iterator:
        """
        Gets induvidual documents to multiply
        """

        return self.source.query()
```

Here, get items just returns the results of `query()` from the store. It could also have been written as a generator:

``` python

    def get_items(self) -> Iterable:
        """
        Gets induvidual documents to multiply
        """
        for doc in self.source.query():
            yield doc
```

We could have also returned a list of items:

``` python

    def get_items(self) -> Iterable:
        """
        Gets induvidual documents to multiply
        """
        docs = list(self.source.query())
```

One advantage of using the generator approach is it is less memory intensive than the approach where a list of items returned. For large datasets, returning a list of all items for processing may be prohibitive due to memory constraints.

## `process_item`

`process_item` just has to do the parallelizable work on each item. Since the item is whatever comes out of `get_items`, you know exactly what it should be. It may be a single document, a list of documents, a mapping, a set, etc.

Our simple process item just has to multiply one field by `self.multiplier`:

``` python

    def process_items(self, item : Dict) -> Dict:
        """
        Multiplies the "a" sub-document by self.multiplier
        """
        new_item = dict(**item)
        new_item["a"] *= self.multiplier
        return new_item
```

## `update_targets`

Finally, we have to put the processed item in to the target store:

``` python

    def update_targets(self,items: List[Dict]):
        """
        Adds the processed items into the target store
        """
        self.target.update(items)
```



!!! note
    Note that whatever `process_items` returns, `update_targets` takes a `List` of these:
    For instance, if `process_items` returns `str`, then `update_targets` would look like:
    ``` python

        def update_target(self,items: List[str]):
    ```



Putting it all together we get:

``` python
from typing import Dict, Iterable, List
from maggma.core import Builder
from maggma.core import Store

class MultiplyBuilder(Builder):
    """
    Simple builder that multiplies the "a" sub-document by pre-set value
    """


    def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
        """
        Arguments:
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        """
        self.source = source
        self.target = target
        self.multiplier = multiplier
        self.kwargs = kwargs

        super().__init__(sources=source,targets=target,**kwargs)

    def get_items(self) -> Iterable:
        """
        Gets induvidual documents to multiply
        """
        docs = list(self.source.query())

    def process_items(self, item : Dict) -> Dict:
        """
        Multiplies the "a" sub-document by self.multiplier
        """
        new_item = dict(**item)
        new_item["a"] *= self.multiplier
        return new_item


    def update_targets(self,items: List[Dict]):
        """
        Adds the processed items into the target store
        """
        self.target.update(items)
```

## Distributed Processing

`maggma` can distribute a builder across multiple computers.

The `Builder` must have a `prechunk` method defined. `prechunk` should do a subset of `get_items` to figure out what needs to be processed and then return dictionaries that modify the `Builder` in-place to only work on each subset.

For example, if in the above example we'd first have to update the builder to be able to work on a subset of keys. One pattern is to define a generic `query` argument for the builder and use that in get items:

``` python
    def __init__(self, source: Store, target: Store, multiplier: int = 2, query: Optional[Dict] = None, **kwargs):
        """
        Arguments:
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        """
        self.source = source
        self.target = target
        self.multiplier = multiplier
        self.query = query
        self.kwargs = kwargs

        super().__init__(sources=source,targets=target,**kwargs)

    def get_items(self) -> Iterable:
        """
        Gets induvidual documents to multiply
        """
        query = self.query or {}
        docs = list(self.source.query(criteria=query))
```

Then we can define a prechunk method that modifies the `Builder` dict in place to operate on just a subset of the keys:

``` python
    from maggma.utils import grouper
    def prechunk(self, number_splits: int) -> Iterable[Dict]:
        keys  = self.source.distinct(self.source.key)
        for split in grouper(keys, N):
            yield {
                "query": {self.source.key: {"$in": list(split)}}
            }
```

When distributed processing runs, it will modify the `Builder` dictionary in place by the prechunk dictionary. In this case, each builder distribute to a worker will get a modified `query` parameter that only runs on a subset of all possible keys.