File: partiql.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (33 lines) | stat: -rw-r--r-- 1,184 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import TYPE_CHECKING, Any, Optional

from moto.dynamodb.exceptions import MockValidationException, ResourceNotFoundException

if TYPE_CHECKING:
    from py_partiql_parser import QueryMetadata


def query(
    statement: str, source_data: dict[str, list[Any]], parameters: list[dict[str, Any]]
) -> tuple[
    list[dict[str, Any]],
    dict[str, list[tuple[Optional[dict[str, Any]], Optional[dict[str, Any]]]]],
]:
    from py_partiql_parser import DynamoDBStatementParser
    from py_partiql_parser.exceptions import DocumentNotFoundException

    try:
        return DynamoDBStatementParser(source_data).parse(statement, parameters)
    except DocumentNotFoundException as dnfe:
        if "." in dnfe.name:
            table_name = dnfe.name.split(".")[0]
            if table_name in source_data:
                raise MockValidationException(
                    message="The table does not have the specified index"
                )
        raise ResourceNotFoundException()


def get_query_metadata(statement: str) -> "QueryMetadata":
    from py_partiql_parser import DynamoDBStatementParser

    return DynamoDBStatementParser.get_query_metadata(query=statement)