File: consistency.py

package info (click to toggle)
python-clevercsv 0.8.4%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,080 kB
  • sloc: python: 6,211; ansic: 870; makefile: 90
file content (220 lines) | stat: -rw-r--r-- 7,095 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# -*- coding: utf-8 -*-

"""
Detect the dialect using the data consistency measure.

Author: Gertjan van den Burg

"""

from dataclasses import dataclass
from functools import lru_cache

from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional

from . import field_size_limit
from .break_ties import tie_breaker
from .cparser_util import parse_string
from .detect_pattern import pattern_score
from .detect_type import DEFAULT_EPS_TYPE
from .detect_type import TypeDetector
from .dialect import SimpleDialect
from .potential_dialects import get_dialects


@dataclass
class ConsistencyScore:
    """Container to track the consistency score calculation

    Parameters
    ----------
    P : float
        The pattern score

    T : Optional[float]
        The type score. Can be None if not computed for speed.

    Q : Optional[float]
        The consistency score. Can be None if not computed for speed.

    """

    P: float
    T: Optional[float]
    Q: Optional[float]


class ConsistencyDetector:
    """Detect the dialect with the data consistency measure

    This class uses the data consistency measure to detect the dialect. See the
    paper for details.

    Parameters
    ----------
    skip : bool
        Skip computation of the type score for dialects with a low pattern
        score.

    verbose : bool
        Print out the dialects considered and their scores.

    cache_capacity: int
        The size of the cache for type detection. Caching the type detection
        result greatly speeds up the computation of the consistency measure.
        The size of the cache can be changed to trade off memory use and speed.

    """

    def __init__(
        self,
        skip: bool = True,
        verbose: bool = False,
        cache_capacity: int = 100_000,
    ) -> None:
        self._skip = skip
        self._verbose = verbose
        self._type_detector = TypeDetector()
        self._cache_capacity = cache_capacity

        # NOTE: A bit ugly but allows setting the cache size dynamically
        @lru_cache(cache_capacity)
        def cached_is_known_type(cell: str, is_quoted: bool) -> bool:
            return self._type_detector.is_known_type(cell, is_quoted)

        self._cached_is_known_type = cached_is_known_type

    def detect(
        self, data: str, delimiters: Optional[List[str]] = None
    ) -> Optional[SimpleDialect]:
        """Detect the dialect using the consistency measure

        Parameters
        ----------
        data : str
            The data of the file as a string

        delimiters : iterable
            List of delimiters to consider. If None, the :func:`get_delimiters`
            function is used to automatically detect this (as described in the
            paper).

        Returns
        -------
        dialect : SimpleDialect
            The detected dialect. If no dialect could be detected, returns None.

        """
        self._cached_is_known_type.cache_clear()

        # TODO: probably some optimization there too
        dialects = get_dialects(data, delimiters=delimiters)

        # TODO: This is not thread-safe and this object can simply own a Parser
        # for each dialect and set the limit directly there (we can also cache
        # the best parsing result)
        old_limit = field_size_limit(len(data) + 1)

        scores = self.compute_consistency_scores(data, dialects)
        best_dialects = ConsistencyDetector.get_best_dialects(scores)
        result: Optional[SimpleDialect] = None
        if len(best_dialects) == 1:
            result = best_dialects[0]
        else:
            result = tie_breaker(data, best_dialects)

        field_size_limit(old_limit)
        return result

    def compute_consistency_scores(
        self, data: str, dialects: List[SimpleDialect]
    ) -> Dict[SimpleDialect, ConsistencyScore]:
        """Compute the consistency score for each dialect

        This function computes the consistency score for each dialect. This is
        done by first computing the pattern score for a dialect. If the class
        is instantiated with ``skip`` set to False, it also computes the type
        score for each dialect. If ``skip`` is True (the default), the type
        score is only computed if the pattern score is larger or equal to the
        current best combined score.

        Parameters
        ----------
        data : str
            The data of the file as a string

        dialects : Iterable[SimpleDialect]
            An iterable of delimiters to consider.

        Returns
        -------
        scores : Dict[SimpleDialect, ConsistencyScore]
            A map with a :class:`ConsistencyScore` object for each dialect
            provided as input.

        """

        scores: Dict[SimpleDialect, ConsistencyScore] = {}
        incumbent_score = -float("inf")
        for dialect in sorted(dialects):
            P = pattern_score(data, dialect)
            if P < incumbent_score and self._skip:
                scores[dialect] = ConsistencyScore(P, None, None)
                if self._verbose:
                    print("%15r:\tP = %15.6f\tskip." % (dialect, P))
                continue

            T = self.compute_type_score(data, dialect)
            Q = P * T
            incumbent_score = max(incumbent_score, Q)
            scores[dialect] = ConsistencyScore(P, T, Q)
            if self._verbose:
                print(
                    "%15r:\tP = %15.6f\tT = %15.6f\tQ = %15.6f"
                    % (dialect, P, T, Q)
                )
        return scores

    @staticmethod
    def get_best_dialects(
        scores: Dict[SimpleDialect, ConsistencyScore]
    ) -> List[SimpleDialect]:
        """Identify the dialects with the highest consistency score"""
        Qscores = [score.Q for score in scores.values()]
        Qmax = -float("inf")
        for q in Qscores:
            if q is None:
                continue
            Qmax = max(Qmax, q)
        return [d for d, score in scores.items() if score.Q == Qmax]

    def compute_type_score(
        self, data: str, dialect: SimpleDialect, eps: float = DEFAULT_EPS_TYPE
    ) -> float:
        """Compute the type score"""
        total = known = 0
        for row in parse_string(data, dialect, return_quoted=True):
            assert all(isinstance(cell, tuple) for cell in row)
            for cell, is_quoted in row:
                total += 1
                known += self._cached_is_known_type(cell, is_quoted=is_quoted)
        if not total:
            return eps
        return max(eps, known / total)


def detect_dialect_consistency(
    data: str,
    delimiters: Optional[Iterable[str]] = None,
    skip: bool = True,
    verbose: bool = False,
) -> Optional[SimpleDialect]:
    """Helper function that wraps ConsistencyDetector"""
    # Mostly kept for backwards compatibility
    consistency_detector = ConsistencyDetector(skip=skip, verbose=verbose)
    if delimiters is not None:
        delimiters = list(delimiters)
    return consistency_detector.detect(data, delimiters=delimiters)