File: test_copy_builder.py

package info (click to toggle)
python-maggma 0.70.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,416 kB
  • sloc: python: 10,150; makefile: 12
file content (144 lines) | stat: -rw-r--r-- 4,287 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Tests for MapBuilder
"""

from datetime import datetime, timedelta

import pytest

from maggma.builders import CopyBuilder
from maggma.stores import MemoryStore


@pytest.fixture()
def source():
    store = MemoryStore("source", key="k", last_updated_field="lu")
    store.connect()
    store.ensure_index("k")
    store.ensure_index("lu")
    return store


@pytest.fixture()
def target():
    store = MemoryStore("target", key="k", last_updated_field="lu")
    store.connect()
    store.ensure_index("k")
    store.ensure_index("lu")
    return store


@pytest.fixture(scope="module")
def now():
    return datetime.utcnow()


@pytest.fixture()
def old_docs(now):
    return [{"lu": now, "k": k, "v": "old"} for k in range(20)]


@pytest.fixture()
def new_docs(now):
    toc = now + timedelta(seconds=1)
    return [{"lu": toc, "k": k, "v": "new"} for k in range(10)]


@pytest.fixture()
def some_failed_old_docs(now):
    docs = [{"lu": now, "k": k, "v": "old", "state": "failed"} for k in range(3)]
    docs.extend([{"lu": now, "k": k, "v": "old", "state": "failed"} for k in range(18, 20)])
    return docs


def test_get_items(source, target, old_docs, some_failed_old_docs):
    builder = CopyBuilder(source, target)
    source.update(old_docs)
    assert len(list(builder.get_items())) == len(old_docs)

    target.update(old_docs)
    assert len(list(builder.get_items())) == 0

    builder = CopyBuilder(source, target, projection=["k"])
    target.remove_docs({})
    assert len(list(builder.get_items())) == len(old_docs)
    assert all("v" not in d for d in builder.get_items())

    source.update(some_failed_old_docs)
    target.update(old_docs)
    target.update(some_failed_old_docs)
    builder = CopyBuilder(source, target)

    assert len(list(builder.get_items())) == 0

    builder = CopyBuilder(source, target, retry_failed=True)
    assert len(list(builder.get_items())) == len(some_failed_old_docs)

    builder = CopyBuilder(source, target, query={"k": {"$lt": 11}})
    assert len(list(builder.get_items())) == 0

    builder = CopyBuilder(source, target, retry_failed=True, query={"k": {"$lt": 11}})
    assert len(list(builder.get_items())) == 3


def test_process_item(source, target, old_docs):
    builder = CopyBuilder(source, target)
    source.update(old_docs)
    items = list(builder.get_items())
    assert len(items) == len(list(map(builder.process_item, items)))


def test_update_targets(source, target, old_docs, new_docs):
    builder = CopyBuilder(source, target)
    builder.update_targets(old_docs)
    builder.update_targets(new_docs)
    assert target.query_one(criteria={"k": 0})["v"] == "new"
    assert target.query_one(criteria={"k": 10})["v"] == "old"


def test_run(source, target, old_docs, new_docs):
    source.update(old_docs)
    source.update(new_docs)
    target.update(old_docs)

    builder = CopyBuilder(source, target)
    builder.run()
    builder.target.connect()
    assert builder.target.query_one(criteria={"k": 0})["v"] == "new"
    assert builder.target.query_one(criteria={"k": 10})["v"] == "old"


def test_query(source, target, old_docs, new_docs):
    builder = CopyBuilder(source, target)
    builder.query = {"k": {"$gt": 5}}
    source.update(old_docs)
    source.update(new_docs)
    builder.run()
    all_docs = list(target.query(criteria={}))
    assert len(all_docs) == 14
    assert min([d["k"] for d in all_docs]) == 6


def test_delete_orphans(source, target, old_docs, new_docs):
    builder = CopyBuilder(source, target, delete_orphans=True)
    source.update(old_docs)
    source.update(new_docs)
    target.update(old_docs)

    deletion_criteria = {"k": {"$in": list(range(5))}}
    source._collection.delete_many(deletion_criteria)
    builder.run()

    assert target._collection.count_documents(deletion_criteria) == 0
    assert target.query_one(criteria={"k": 5})["v"] == "new"
    assert target.query_one(criteria={"k": 10})["v"] == "old"


def test_prechunk(source, target, old_docs, new_docs):
    builder = CopyBuilder(source, target, delete_orphans=True)
    source.update(old_docs)
    source.update(new_docs)

    chunk_queries = list(builder.prechunk(2))
    assert len(chunk_queries) == 2
    assert chunk_queries[0] == {"query": {"k": {"$in": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}}}