1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
|
# Writing a Builder
## Builder Architecture
A `Builder` is a class that inherits from `maggma.core.Builder` and implement 3 methods:
* `get_items`: This method should return some iterable of items to run through `process_items`
* `process_item`: This method should take a single item, process it, and return the processed item
* `update_targets`: This method should take a list of processed items and update the target stores.
To make this less abstract, we will write a builder that multiplies the "a" sub-document by a pre-configured `multiplier`. Let's assume we have some source collection in MongoDB with documents that look like this:
``` json
{
"id": 1,
"a": 3,
"last_updated": "2019-11-3"
}
```
## Class definition and `__init__`
A simple class definition for a Maggma-based builder looks like this:
``` python
from maggma.core import Builder
from maggma.core import Store
class MultiplyBuilder(Builder):
"""
Simple builder that multiplies the "a" sub-document by pre-set value
"""
```
The `__init__` for a builder can have any set of parameters. Generally, you want a source `Store` and a target `Store` along with any parameters that configure the builder. Due to the `MSONable` pattern, any parameters to `__init__` have to be stored as attributes. A simple `__init__` would look like this:
``` python
def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
"""
Arguments:
source: the source store
target: the target store
multiplier: the multiplier to apply to "a" sub-document
"""
self.source = source
self.target = target
self.multiplier = multiplier
self.kwargs = kwargs
super().__init__(sources=source,targets=target,**kwargs)
```
Python type annotations provide a really nice way of documenting the types we expect and being able to later type check using `mypy`. We defined the type for `source` and `target` as `Store` since we only care that implements that pattern. How exactly these `Store`s operate doesn't concern us here.
Note that the `__init__` arguments: `source`, `target`, `multiplier`, and `kwargs` get saved as attributes:
``` python
self.source = source
self.target = target
self.multiplier = multiplier
self.kwargs = kwargs
```
Finally, we want to call the base `Builder`'s `__init__` to tell it our sources and targets for this builder. In addition, we pass along any extra parameters that might configured the base builder class.
``` python
super().__init__(sources=source,targets=target,**kwargs)
```
Calling the parent class `__init__` is a good practice as sub-classing builders is a good way to encapsulate complex logic.
## `get_items`
`get_items` is conceptually a simple method to implement, but in practice can easily be more code than the rest of the builder. All of the logic for getting data from the sources has to happen here, which requires some planning. `get_items` should also sort all of the data into individual **items** to process. This simple builder has a very easy `get_items`:
``` python
def get_items(self) -> Iterator:
"""
Gets induvidual documents to multiply
"""
return self.source.query()
```
Here, get items just returns the results of `query()` from the store. It could also have been written as a generator:
``` python
def get_items(self) -> Iterable:
"""
Gets induvidual documents to multiply
"""
for doc in self.source.query():
yield doc
```
We could have also returned a list of items:
``` python
def get_items(self) -> Iterable:
"""
Gets induvidual documents to multiply
"""
docs = list(self.source.query())
```
One advantage of using the generator approach is it is less memory intensive than the approach where a list of items returned. For large datasets, returning a list of all items for processing may be prohibitive due to memory constraints.
## `process_item`
`process_item` just has to do the parallelizable work on each item. Since the item is whatever comes out of `get_items`, you know exactly what it should be. It may be a single document, a list of documents, a mapping, a set, etc.
Our simple process item just has to multiply one field by `self.multiplier`:
``` python
def process_items(self, item : Dict) -> Dict:
"""
Multiplies the "a" sub-document by self.multiplier
"""
new_item = dict(**item)
new_item["a"] *= self.multiplier
return new_item
```
## `update_targets`
Finally, we have to put the processed item in to the target store:
``` python
def update_targets(self,items: List[Dict]):
"""
Adds the processed items into the target store
"""
self.target.update(items)
```
!!! note
Note that whatever `process_items` returns, `update_targets` takes a `List` of these:
For instance, if `process_items` returns `str`, then `update_targets` would look like:
``` python
def update_target(self,items: List[str]):
```
Putting it all together we get:
``` python
from typing import Dict, Iterable, List
from maggma.core import Builder
from maggma.core import Store
class MultiplyBuilder(Builder):
"""
Simple builder that multiplies the "a" sub-document by pre-set value
"""
def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
"""
Arguments:
source: the source store
target: the target store
multiplier: the multiplier to apply to "a" sub-document
"""
self.source = source
self.target = target
self.multiplier = multiplier
self.kwargs = kwargs
super().__init__(sources=source,targets=target,**kwargs)
def get_items(self) -> Iterable:
"""
Gets induvidual documents to multiply
"""
docs = list(self.source.query())
def process_items(self, item : Dict) -> Dict:
"""
Multiplies the "a" sub-document by self.multiplier
"""
new_item = dict(**item)
new_item["a"] *= self.multiplier
return new_item
def update_targets(self,items: List[Dict]):
"""
Adds the processed items into the target store
"""
self.target.update(items)
```
## Distributed Processing
`maggma` can distribute a builder across multiple computers.
The `Builder` must have a `prechunk` method defined. `prechunk` should do a subset of `get_items` to figure out what needs to be processed and then return dictionaries that modify the `Builder` in-place to only work on each subset.
For example, if in the above example we'd first have to update the builder to be able to work on a subset of keys. One pattern is to define a generic `query` argument for the builder and use that in get items:
``` python
def __init__(self, source: Store, target: Store, multiplier: int = 2, query: Optional[Dict] = None, **kwargs):
"""
Arguments:
source: the source store
target: the target store
multiplier: the multiplier to apply to "a" sub-document
"""
self.source = source
self.target = target
self.multiplier = multiplier
self.query = query
self.kwargs = kwargs
super().__init__(sources=source,targets=target,**kwargs)
def get_items(self) -> Iterable:
"""
Gets induvidual documents to multiply
"""
query = self.query or {}
docs = list(self.source.query(criteria=query))
```
Then we can define a prechunk method that modifies the `Builder` dict in place to operate on just a subset of the keys:
``` python
from maggma.utils import grouper
def prechunk(self, number_splits: int) -> Iterable[Dict]:
keys = self.source.distinct(self.source.key)
for split in grouper(keys, N):
yield {
"query": {self.source.key: {"$in": list(split)}}
}
```
When distributed processing runs, it will modify the `Builder` dictionary in place by the prechunk dictionary. In this case, each builder distribute to a worker will get a modified `query` parameter that only runs on a subset of all possible keys.
|