1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
from __future__ import annotations
import logging
import os
from contextlib import ExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import pytest
from rdflib import Graph
from rdflib.tools.chunk_serializer import serialize_in_chunks
from test.data import TEST_DATA_DIR
from test.utils import GraphHelper
from test.utils.graph import cached_graph
from test.utils.namespace import MF
from test.utils.path import ctx_chdir
if TYPE_CHECKING:
from builtins import ellipsis
logger = logging.getLogger(__name__)
def test_chunk_by_triples(tmp_path: Path):
g = cached_graph((TEST_DATA_DIR / "suites/w3c/trig/manifest.ttl",))
# this graph has 2,848 triples
# serialize into chunks file with 100 triples each
serialize_in_chunks(
g, max_triples=100, file_name_stem="chunk_100", output_dir=tmp_path
)
# count the resultant .nt files, should be math.ceil(2848 / 100) = 25
assert len(list(tmp_path.glob("*.nt"))) == 25
# check, when a graph is made from the chunk files, it's isomorphic with original
g2 = Graph()
for f in tmp_path.glob("*.nt"):
g2.parse(f, format="nt")
assert g.isomorphic(g2), "Reconstructed graph is not isomorphic with original"
def test_chunk_by_size(tmp_path: Path):
g = cached_graph((TEST_DATA_DIR / "suites/w3c/trig/manifest.ttl",))
# as an NT file, this graph is 323kb
# serialize into chunks file of > 50kb each
serialize_in_chunks(
g, max_file_size_kb=50, file_name_stem="chunk_50k", output_dir=tmp_path
)
# check all files are size < 50kb, with a margin up to 60kb
for f in Path(tmp_path).glob("*.nt"):
assert os.path.getsize(f) < 50000
# check, when a graph is made from the chunk files, it's isomorphic with original
g2 = Graph()
for f in Path(tmp_path).glob("*.nt"):
g2.parse(f, format="nt")
assert g.isomorphic(g2), "Reconstructed graph is not isomorphic with original"
@pytest.mark.parametrize(
[
"test_graph_path",
"max_triples",
"max_file_size_kb",
"write_prefixes",
"set_output_dir",
"expected_file_count",
],
[
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", ..., ..., False, True, 1),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", ..., ..., True, False, 2),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", ..., 5, True, False, (3, 7)),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", ..., 1, False, True, (15, 25)),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", 10000, 1, False, True, (15, 25)),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", 20, ..., False, True, 5),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", 20, ..., True, True, 6),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", 100, ..., True, True, 2),
(TEST_DATA_DIR / "defined_namespaces/mf.ttl", 100, ..., False, True, 1),
],
)
def test_chunking(
tmp_path: Path,
test_graph_path: Path,
max_triples: Union[ellipsis, int],
max_file_size_kb: Union[ellipsis, int, None],
write_prefixes: bool,
set_output_dir: bool,
expected_file_count: Optional[Union[int, Tuple[Optional[int], Optional[int]]]],
) -> None:
test_graph = cached_graph((test_graph_path,))
kwargs: Dict[str, Any] = {"write_prefixes": write_prefixes}
if max_triples is not ...:
kwargs["max_triples"] = max_triples
if max_file_size_kb is not ...:
kwargs["max_file_size_kb"] = max_file_size_kb
logger.debug("kwargs = %s", kwargs)
with ExitStack() as xstack:
if set_output_dir:
kwargs["output_dir"] = tmp_path
else:
xstack.enter_context(ctx_chdir(tmp_path))
serialize_in_chunks(test_graph, **kwargs)
# set the values to defaults if they were elided in test parameters.
if max_file_size_kb is ...:
max_file_size_kb = None
if max_triples is ...:
max_triples = 10000
stem = "chunk"
output_paths = set(item.relative_to(tmp_path) for item in tmp_path.glob("**/*"))
# output_filenames = set(f"{item}" for item in output_paths)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("tmp_path = %s files = %s", tmp_path, output_paths)
if isinstance(expected_file_count, tuple):
if expected_file_count[0] is not None:
assert expected_file_count[0] <= len(output_paths)
if expected_file_count[1] is not None:
assert expected_file_count[1] >= len(output_paths)
elif isinstance(expected_file_count, int):
assert expected_file_count == len(output_paths)
recovered_graph = Graph(bind_namespaces="none")
if write_prefixes is True:
prefixes_path = Path(f"{stem}_000000.ttl")
assert prefixes_path in output_paths
output_paths.remove(prefixes_path)
recovered_graph.parse(tmp_path / prefixes_path, format="turtle")
namespaces_data = (tmp_path / prefixes_path).read_text("utf-8")
assert f"{MF}" in namespaces_data
if len(output_paths) == 1:
all_file = Path(f"{stem}_all.nt")
assert all_file in output_paths
all_file = tmp_path / all_file
file_bytes = all_file.read_bytes()
recovered_graph.parse(all_file, format="nt")
if isinstance(max_file_size_kb, int):
assert len(file_bytes) <= (max_file_size_kb * 1000)
elif isinstance(max_triples, int):
assert len(recovered_graph) <= max_triples
elif max_file_size_kb is not None:
assert isinstance(max_file_size_kb, int)
for output_path in output_paths:
output_path = tmp_path / output_path
file_bytes = output_path.read_bytes()
assert len(file_bytes) <= (max_file_size_kb * 1000)
logger.debug("reading %s", output_path)
recovered_graph.parse(output_path, format="nt")
else:
assert isinstance(max_triples, int)
for output_path in output_paths:
output_path = tmp_path / output_path
file_bytes = output_path.read_bytes()
triple_count = len(recovered_graph)
logger.debug("reading %s", output_path)
recovered_graph.parse(output_path, format="nt")
extra_triples = len(recovered_graph) - triple_count
assert extra_triples <= max_triples
logger.debug("checking isomorphism")
GraphHelper.assert_isomorphic(test_graph, recovered_graph)
|