1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
# Group Builder
Another advanced template in `maggma` is the `GroupBuilder`, which groups documents together before applying your function on the group of items. Just like `MapBuilder`, `GroupBuilder` also handles incremental building, keeping track of errors, getting only the data you need, and managing timeouts. GroupBuilder won't delete orphaned documents since that reverse relationship isn't valid.
Let's create a simple `ResupplyBuilder`, which will look at the inventory of items and determine what items need resupply. The source document will look something like this:
``` JSON
{
"name": "Banana",
"type": "fruit",
"quantity": 20,
"minimum": 10,
"last_updated": "2019-11-3T19:09:45"
}
```
Our builder should give us documents that look like this:
``` JSON
{
"names": ["Grapes", "Apples", "Bananas"],
"type": "fruit",
"resupply": {
"Apples": 10,
"Bananes": 0,
"Grapes": 5
},
"last_updated": "2019-11-3T19:09:45"
}
```
To begin, we define our `GroupBuilder`:
``` python
from maggma.builders import GroupBuilder
from maggma.core import Store
class ResupplyBuilder(GroupBuilder):
"""
Simple builder that determines which items to resupply
"""
def __init__(inventory: Store, resupply: Store,resupply_percent : int = 100, **kwargs):
"""
Arguments:
inventory: current inventory information
resupply: target resupply information
resupply_percent: the percent of the minimum to include in the resupply
"""
self.inventory = inventory
self.resupply = resupply
self.resupply_percent = resupply_percent
self.kwargs = kwargs
super().__init__(source=inventory, target=resupply, grouping_properties=["type"], **kwargs)
```
Note that unlike the previous `MapBuilder` example, we didn't call the source and target stores as such. Providing more useful names is a good idea in writing builders to make it clearer what the underlying data should look like.
`GroupBuilder` inherits from `MapBuilder` so it has the same configurational parameters.
- query: A query to apply to items in the source Store.
- projection: list of the fields you want to project. This can reduce the data transfer load if you only need certain fields or sub-documents from the source documents
- timeout: optional timeout on the process function
- store_process_timeout: adds the process time into the target document for profiling
- retry_failed: retries running the process function on previously failed documents
One parameter that doesn't work in `GroupBuilder` is `delete_orphans`, since the Many-to-One relationship makes determining orphaned documents very difficult.
Finally let's get to the hard part which is running our function. We do this by defining `unary_function`
``` python
def unary_function(self, items: List[Dict]) -> Dict:
resupply = {}
for item in items:
if item["quantity"] > item["minimum"]:
resupply[item["name"]] = int(item["minimum"] * self.resupply_percent )
else:
resupply[item["name"]] = 0
return {"resupply": resupply}
```
Just as in `MapBuilder`, we're not returning all the extra information typically kept in the originally item. Normally, we would have to write code that copies over the source `key` and convert it to the target `key`. Same goes for the `last_updated_field`. `GroupBuilder` takes care of this, while also recording errors, processing time, and the Builder version.`GroupBuilder` also keeps a plural version of the `source.key` field, so in this example, all the `name` values will be put together and kept in `names`
|