File: detect_pattern.py

package info (click to toggle)
python-clevercsv 0.8.4%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,080 kB
  • sloc: python: 6,211; ansic: 870; makefile: 90
file content (156 lines) | stat: -rw-r--r-- 3,655 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# -*- coding: utf-8 -*-

"""
Code for computing the pattern score.

Author: Gertjan van den Burg

"""

import collections
import re

from typing import Optional
from typing import Pattern

from .cabstraction import base_abstraction
from .cabstraction import c_merge_with_quotechar
from .dialect import SimpleDialect

DEFAULT_EPS_PAT: float = 1e-3

RE_MULTI_C: Pattern[str] = re.compile(r"C{2,}")


def pattern_score(
    data: str, dialect: SimpleDialect, eps: float = DEFAULT_EPS_PAT
) -> float:
    """
    Compute the pattern score for given data and a dialect.

    Parameters
    ----------

    data : str
        The data of the file as a raw character string

    dialect: dialect.Dialect
        The dialect object

    Returns
    -------
    score : float
        the pattern score

    """
    A = make_abstraction(data, dialect)
    row_patterns = collections.Counter(A.split("R"))
    P = 0.0
    for pat_k, Nk in row_patterns.items():
        Lk = len(pat_k.split("D"))
        P += Nk * (max(eps, Lk - 1) / Lk)
    P /= len(row_patterns)
    return P


def make_abstraction(data: str, dialect: SimpleDialect) -> str:
    """Create an abstract representation of the CSV file based on the dialect.

    This function constructs the basic abstraction used to compute the row
    patterns.

    Parameters
    ----------
    data : str
        The data of the file as a string.

    dialect : SimpleDialect
        A dialect to parse the file with.

    Returns
    -------
    abstraction : str
        An abstract representation of the CSV file.

    """
    A = base_abstraction(
        data, dialect.delimiter, dialect.quotechar, dialect.escapechar
    )
    A = merge_with_quotechar(A)
    A = fill_empties(A)
    A = strip_trailing(A)
    return A


def merge_with_quotechar(
    S: str, dialect: Optional[SimpleDialect] = None
) -> str:
    """Merge quoted blocks in the abstraction

    This function takes the abstract representation and merges quoted blocks
    (``QC...CQ``) to a single cell (``C``). The function takes nested quotes
    into account.

    Parameters
    ----------
    S : str
        The data of a file as a string

    dialect : SimpleDialect
        The dialect used to make the abstraction. This is not used but kept for
        backwards compatibility. Will be removed in a future version.

    Returns
    -------
    abstraction : str
        A simplified version of the abstraction with quoted blocks merged.

    """
    return c_merge_with_quotechar(S)


def fill_empties(abstract: str) -> str:
    """Fill empty cells in the abstraction

    The way the row patterns are constructed assumes that empty cells are
    marked by the letter `C` as well. This function fill those in. The function
    also removes duplicate occurrances of ``CC`` and replaces these  with
    ``C``.

    Parameters
    ----------
    abstract : str
        The abstract representation of the file.

    Returns
    -------
    abstraction : str
        The abstract representation with empties filled.


    """
    while "DD" in abstract:
        abstract = abstract.replace("DD", "DCD")

    while "DR" in abstract:
        abstract = abstract.replace("DR", "DCR")

    while "RD" in abstract:
        abstract = abstract.replace("RD", "RCD")

    abstract = RE_MULTI_C.sub("C", abstract)

    if abstract.startswith("D"):
        abstract = "C" + abstract

    if abstract.endswith("D"):
        abstract += "C"

    return abstract


def strip_trailing(abstract: str) -> str:
    """Strip trailing row separator from abstraction."""
    while abstract.endswith("R"):
        abstract = abstract[:-1]
    return abstract