File: free_text.py

package info (click to toggle)
pystac-client 0.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 47,416 kB
  • sloc: python: 4,652; sh: 74; makefile: 60
file content (71 lines) | stat: -rw-r--r-- 2,374 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""Client-side free-text search filtering as described in `OGC API - Features - Part 9:
Text Search <https://docs.ogc.org/DRAFTS/24-031.html#q-parameter>`__

Uses the `SQLite FTS5 Extension <https://www.sqlite.org/fts5.html>`__ to implement
free-text search filtering.
"""

import re
import sqlite3


def parse_query_for_sqlite(q: str) -> str:
    """Translate an OGC Features API free-text search query into the SQLite text search
    syntax
    """
    # separate out search terms, quoted exact phrases, commas, and exact phrases
    tokens = [token.strip() for token in re.findall(r'"[^"]*"|,|[\(\)]|[^,\s\(\)]+', q)]

    # special characters that need to be escaped or quoted for sqlite fts5
    special_chars = set("-@&:^~<>=")

    for i, token in enumerate(tokens):
        if token.startswith("+"):
            tokens[i] = token[1:].strip()
        elif token.startswith("-"):
            tokens[i] = "NOT " + token[1:].strip()
        elif token == ",":
            tokens[i] = "OR"
        elif any(char in token for char in special_chars):
            # Escape any existing double quotes in the token
            escaped_token = token.replace('"', '""')
            tokens[i] = f'"{escaped_token}"'

    return " ".join(tokens)


def sqlite_text_search(q: str, text_fields: dict[str, str]) -> bool:
    """Perform a free-text search against a set of text fields for a single
    collection to determine if that collection matches the query.

    Creates an in-memory SQLite database with a single table and a single row
    then runs the MATCH query to determine if the row matches the search
    criteria.
    """
    column_clause = ", ".join(text_fields.keys())
    value_clause = ", ".join(["?" for _ in text_fields.keys()])

    with sqlite3.connect(":memory:") as conn:  # Use an in-memory database
        cursor = conn.cursor()

        cursor.execute(
            f"""
        CREATE VIRTUAL TABLE collections USING fts5({column_clause});
        """
        )

        cursor.execute(
            f"""
        INSERT INTO collections ({column_clause}) VALUES ({value_clause});
        """,
            tuple(text_fields.values()),
        )

        cursor.execute(
            f"""
        SELECT COUNT(*)
        FROM collections WHERE collections MATCH '{parse_query_for_sqlite(q)}';
        """
        )

        return bool(cursor.fetchone()[0])