File: test_analysis.py

package info (click to toggle)
dials 3.25.0%2Bdfsg3-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 20,112 kB
  • sloc: python: 134,740; cpp: 34,526; makefile: 160; sh: 142
file content (92 lines) | stat: -rw-r--r-- 3,166 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from __future__ import annotations

import json
import pathlib

import pytest

from dials.algorithms.correlation.analysis import CorrelationMatrix
from dials.command_line.correlation_matrix import phil_scope
from dials.util.multi_dataset_handling import (
    assign_unique_identifiers,
    parse_multiple_datasets,
)
from dials.util.options import ArgumentParser, reflections_and_experiments_from_files


@pytest.fixture()
def proteinase_k(dials_data):
    mcp = dials_data("vmxi_proteinase_k_sweeps", pathlib=True)
    params = phil_scope.extract()
    input_data = []
    parser = ArgumentParser(
        usage=" ",
        phil=phil_scope,
        read_reflections=True,
        read_experiments=True,
        check_format=False,
        epilog=" ",
    )
    for i in [0, 1, 2, 3]:
        input_data.append(str(mcp / f"experiments_{i}.expt"))
        input_data.append(str(mcp / f"reflections_{i}.refl"))

    params, options, args = parser.parse_args(
        args=input_data, show_diff_phil=False, return_unhandled=True
    )

    params.output.json = "dials.correlation_matrix.json"

    reflections, experiments = reflections_and_experiments_from_files(
        params.input.reflections, params.input.experiments
    )

    reflections = parse_multiple_datasets(reflections)
    assert len(experiments) == len(reflections)
    assert len(experiments) > 1
    experiments, reflections = assign_unique_identifiers(experiments, reflections)
    yield experiments, reflections, params


def test_corr_mat(proteinase_k, run_in_tmp_path):
    experiments, reflections, params = proteinase_k
    matrices = CorrelationMatrix(experiments, reflections, params)
    matrices.calculate_matrices()
    matrices.output_json()
    assert pathlib.Path("dials.correlation_matrix.json").is_file()


def test_filtered_corr_mat(proteinase_k, run_in_tmp_path):
    experiments, reflections, params = proteinase_k
    ids_to_identifiers_map = {}
    for table in reflections:
        ids_to_identifiers_map.update(table.experiment_identifiers())

    # Simulate filtered dataset by multiplex
    id_to_remove = [ids_to_identifiers_map[2]]
    ids_to_identifiers_map.pop(2)
    reflections.pop(2)
    experiments.remove_on_experiment_identifiers(id_to_remove)

    matrices = CorrelationMatrix(
        experiments, reflections, params, ids_to_identifiers_map
    )
    matrices.calculate_matrices()
    matrices.output_json()
    assert pathlib.Path("dials.correlation_matrix.json").is_file()

    expected_ids = [[0, 3], [0, 1, 3]]

    # Check main algorithm correct with filtering
    for i, j in zip(matrices.correlation_clusters, expected_ids):
        assert i.labels == j

    # Check json output also correct
    with open(pathlib.Path("dials.correlation_matrix.json")) as f:
        data = json.load(f)

    assert len(data["correlation_matrix_clustering"]) == len(expected_ids)
    for i, j in zip(data["correlation_matrix_clustering"], expected_ids):
        assert len(data["correlation_matrix_clustering"][i]["datasets"]) == len(j)
        for a, e in zip(data["correlation_matrix_clustering"][i]["datasets"], j):
            assert a == ids_to_identifiers_map[e]