1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
|
import json
import os
from datetime import datetime
import numpy as np
import numpy.testing as nptu
import pytest
from pymongo.errors import ConfigurationError
from maggma.core import StoreError
from maggma.stores import GridFSStore, MongoStore
from maggma.stores.gridfs import GridFSURIStore, files_collection_fields
@pytest.fixture()
def mongostore():
store = MongoStore("maggma_test", "test")
store.connect()
yield store
store._collection.drop()
@pytest.fixture()
def gridfsstore():
store = GridFSStore("maggma_test", "test", key="task_id")
store.connect()
yield store
store._files_collection.drop()
store._chunks_collection.drop()
def test_update(gridfsstore):
data1 = np.random.rand(256)
data2 = np.random.rand(256)
tic = datetime(2018, 4, 12, 16)
# Test metadata storage
gridfsstore.update([{"task_id": "mp-1", "data": data1, gridfsstore.last_updated_field: tic}])
assert gridfsstore._files_collection.find_one({"metadata.task_id": "mp-1"}) is not None
# Test storing data
gridfsstore.update([{"task_id": "mp-1", "data": data2, gridfsstore.last_updated_field: tic}])
assert len(list(gridfsstore.query({"task_id": "mp-1"}))) == 1
assert "task_id" in gridfsstore.query_one({"task_id": "mp-1"})
nptu.assert_almost_equal(gridfsstore.query_one({"task_id": "mp-1"})["data"], data2, 7)
# Test storing compressed data
gridfsstore = GridFSStore("maggma_test", "test", key="task_id", compression=True)
gridfsstore.connect()
gridfsstore.update([{"task_id": "mp-1", "data": data1}])
assert gridfsstore._files_collection.find_one({"metadata.compression": "zlib"}) is not None
nptu.assert_almost_equal(gridfsstore.query_one({"task_id": "mp-1"})["data"], data1, 7)
def test_remove(gridfsstore):
data1 = np.random.rand(256)
data2 = np.random.rand(256)
tic = datetime(2018, 4, 12, 16)
gridfsstore.update([{"task_id": "mp-1", "data": data1, gridfsstore.last_updated_field: tic}])
gridfsstore.update([{"task_id": "mp-2", "data": data2, gridfsstore.last_updated_field: tic}])
assert gridfsstore.query_one(criteria={"task_id": "mp-1"})
assert gridfsstore.query_one(criteria={"task_id": "mp-2"})
gridfsstore.remove_docs({"task_id": "mp-1"})
assert gridfsstore.query_one(criteria={"task_id": "mp-1"}) is None
assert gridfsstore.query_one(criteria={"task_id": "mp-2"})
def test_count(gridfsstore):
data1 = np.random.rand(256)
data2 = np.random.rand(256)
tic = datetime(2018, 4, 12, 16)
gridfsstore.update([{"task_id": "mp-1", "data": data1, gridfsstore.last_updated_field: tic}])
assert gridfsstore.count() == 1
gridfsstore.update([{"task_id": "mp-2", "data": data2, gridfsstore.last_updated_field: tic}])
assert gridfsstore.count() == 2
assert gridfsstore.count({"task_id": "mp-2"}) == 1
def test_query(gridfsstore):
data1 = np.random.rand(256)
data2 = np.random.rand(256)
tic = datetime(2018, 4, 12, 16)
gridfsstore.update([{"task_id": "mp-1", "data": data1, gridfsstore.last_updated_field: tic}])
gridfsstore.update([{"task_id": "mp-2", "data": data2, gridfsstore.last_updated_field: tic}])
doc = gridfsstore.query_one(criteria={"task_id": "mp-1"})
nptu.assert_almost_equal(doc["data"], data1, 7)
doc = gridfsstore.query_one(criteria={"task_id": "mp-2"})
nptu.assert_almost_equal(doc["data"], data2, 7)
assert gridfsstore.last_updated_field in doc
assert gridfsstore.query_one(criteria={"task_id": "mp-3"}) is None
def test_query_gridfs_file(gridfsstore):
# put the data directly in gridfs, mimicking an existing gridfs collection
# generated without the store
gridfsstore._collection.put(b"hello world", task_id="mp-1")
doc = gridfsstore.query_one()
assert doc["data"].decode() == "hello world"
assert doc[gridfsstore.key] == "mp-1"
def test_last_updated(gridfsstore):
data1 = np.random.rand(256)
data2 = np.random.rand(256)
tic = datetime(2018, 4, 12, 16)
gridfsstore.update([{"task_id": "mp-1", "data": data1, gridfsstore.last_updated_field: tic}])
gridfsstore.update([{"task_id": "mp-2", "data": data2, gridfsstore.last_updated_field: tic}])
assert gridfsstore.last_updated == tic
toc = datetime(2019, 6, 12, 16)
gridfsstore.update([{"task_id": "mp-3", "data": data2, gridfsstore.last_updated_field: toc}])
assert gridfsstore.last_updated == toc
tic = datetime(2017, 6, 12, 16)
gridfsstore.update([{"task_id": "mp-4", "data": data2, gridfsstore.last_updated_field: tic}])
assert gridfsstore.last_updated == toc
def test_groupby(gridfsstore):
tic = datetime(2018, 4, 12, 16)
for i in range(3):
gridfsstore.update(
[{"task_id": f"mp-{i}", "a": 1, gridfsstore.last_updated_field: tic}],
key=["task_id", "a"],
)
for i in range(3, 7):
gridfsstore.update(
[{"task_id": f"mp-{i}", "a": 2, gridfsstore.last_updated_field: tic}],
key=["task_id", "a"],
)
groups = list(gridfsstore.groupby("a"))
assert len(groups) == 2
assert {g[0]["a"] for g in groups} == {1, 2}
by_group = {}
for group, docs in groups:
by_group[group["a"]] = {d["task_id"] for d in docs}
assert by_group[1] == {"mp-0", "mp-1", "mp-2"}
assert by_group[2] == {"mp-3", "mp-4", "mp-5", "mp-6"}
def test_distinct(gridfsstore):
tic = datetime(2018, 4, 12, 16)
for i in range(3):
gridfsstore.update(
[{"task_id": f"mp-{i}", "a": 1, gridfsstore.last_updated_field: tic}],
key=["task_id", "a"],
)
for i in range(3, 7):
gridfsstore.update(
[{"task_id": f"mp-{i}", "a": 2, gridfsstore.last_updated_field: tic}],
key=["task_id", "a"],
)
assert set(gridfsstore.distinct("a")) == {1, 2}
def test_eq(mongostore, gridfsstore):
assert gridfsstore == gridfsstore
assert mongostore != gridfsstore
def test_index(gridfsstore):
assert gridfsstore.ensure_index("test_key")
for field in files_collection_fields:
assert gridfsstore.ensure_index(field)
def test_gfs_metadata(gridfsstore):
"""
Ensure metadata is put back in the document
"""
tic = datetime(2018, 4, 12, 16)
gridfsstore.ensure_metadata = True
for i in range(3):
data = {
"a": 1,
}
metadata = {"task_id": f"mp-{i}", "a": 1, gridfsstore.last_updated_field: tic}
data = json.dumps(data).encode("UTF-8")
gridfsstore._collection.put(data, metadata=metadata)
for d in gridfsstore.query():
assert "task_id" in d
assert gridfsstore.last_updated_field in d
def test_gridfsstore_from_launchpad_file(lp_file):
ms = GridFSStore.from_launchpad_file(lp_file, collection_name="tmp")
ms.connect()
assert ms.name == "gridfs://localhost/maggma_tests/tmp"
def test_searchable_fields(gridfsstore):
tic = datetime(2018, 4, 12, 16)
data = [{"task_id": f"mp-{i}", "a": i, gridfsstore.last_updated_field: tic} for i in range(3)]
gridfsstore.searchable_fields = ["task_id"]
gridfsstore.update(data, key="a")
# This should only work if the searchable field was put into the index store
assert set(gridfsstore.distinct("task_id")) == {"mp-0", "mp-1", "mp-2"}
def test_additional_metadata(gridfsstore):
tic = datetime(2018, 4, 12, 16)
data = [{"task_id": f"mp-{i}", "a": i, gridfsstore.last_updated_field: tic} for i in range(3)]
gridfsstore.update(data, key="a", additional_metadata="task_id")
# This should only work if the searchable field was put into the index store
assert set(gridfsstore.distinct("task_id")) == {"mp-0", "mp-1", "mp-2"}
@pytest.mark.skipif(
"mongodb+srv" not in os.environ.get("MONGODB_SRV_URI", ""),
reason="requires special mongodb+srv URI",
)
def test_gridfs_uri():
uri = os.environ["MONGODB_SRV_URI"]
store = GridFSURIStore(uri, database="mp_core", collection_name="xas")
store.connect()
is_name = store.name is uri
# This is try and keep the secret safe
assert is_name
def test_gridfs_uri_dbname_parse():
# test parsing dbname from uri
uri_with_db = "mongodb://uuu:xxxx@host:27017/fake_db"
store = GridFSURIStore(uri_with_db, "test")
assert store.database == "fake_db"
uri_with_db = "mongodb://uuu:xxxx@host:27017/fake_db"
store = GridFSURIStore(uri_with_db, "test", database="fake_db2")
assert store.database == "fake_db2"
uri_with_db = "mongodb://uuu:xxxx@host:27017"
with pytest.raises(ConfigurationError):
GridFSURIStore(uri_with_db, "test")
def test_close(gridfsstore):
assert gridfsstore.query_one() is None
gridfsstore.close()
with pytest.raises(StoreError):
gridfsstore.query_one()
# reconnect to allow the drop of the collection in the fixture
gridfsstore.connect()
|