File: main.py

package info (click to toggle)
sparql-wrapper-python 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,228 kB
  • sloc: python: 14,201; makefile: 30
file content (157 lines) | stat: -rw-r--r-- 4,330 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python

# -*- coding: utf-8 -*-

import argparse
import json
import os
import shutil
import sys
import xml
from typing import List, Optional

import rdflib

from . import __version__
from .Wrapper import SPARQLWrapper, _allowedAuth, _allowedFormats, _allowedRequests


class SPARQLWrapperFormatter(
    argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
    pass


def check_file(v: str) -> str:
    if os.path.isfile(v):
        return v
    elif v == "-":
        return "-"  # stdin
    else:
        raise argparse.ArgumentTypeError("file '%s' is not found" % v)


def choicesDescriptions() -> str:
    d = "\n  - ".join(["allowed FORMAT:"] + _allowedFormats)
    d += "\n  - ".join(["\n\nallowed METHOD:"] + _allowedRequests)
    d += "\n  - ".join(["\n\nallowed AUTH:"] + _allowedAuth)
    return d


def parse_args(test: Optional[List[str]] = None) -> argparse.Namespace:
    """Parse arguments."""
    parser = argparse.ArgumentParser(
        prog="rqw",
        formatter_class=(
            lambda prog: SPARQLWrapperFormatter(
                prog,
                **{
                    "width": shutil.get_terminal_size(fallback=(120, 50)).columns,
                    "max_help_position": 30,
                },
            )
        ),
        description="sparqlwrapper CLI",
        epilog=choicesDescriptions(),
    )
    input_group = parser.add_mutually_exclusive_group(required=True)
    input_group.add_argument(
        "-f",
        "--file",
        metavar="FILE",
        type=check_file,
        help="query with sparql file (stdin: -)",
    )
    input_group.add_argument("-Q", "--query", metavar="QUERY", help="query with string")
    parser.add_argument(
        "-F",
        "--format",
        default="json",
        metavar="FORMAT",
        choices=_allowedFormats,
        help="response format",
    )
    parser.add_argument(
        "-e",
        "--endpoint",
        metavar="URI",
        help="sparql endpoint",
        default="http://dbpedia.org/sparql",
    )
    parser.add_argument(
        "-m",
        "--method",
        metavar="METHOD",
        choices=_allowedRequests,
        help="request method",
    )
    parser.add_argument(
        "-a", "--auth", metavar="AUTH", choices=_allowedAuth, help="HTTP auth"
    )
    parser.add_argument(
        "-u", "--username", metavar="ID", default="guest", help="username for auth"
    )
    parser.add_argument(
        "-p", "--password", metavar="PW", default="", help="password for auth"
    )
    parser.add_argument("-q", "--quiet", action="store_true", help="supress warnings")
    parser.add_argument(
        "-V", "--version", action="version", version="%(prog)s {}".format(__version__)
    )
    if test is None:
        return parser.parse_args()
    else:
        return parser.parse_args(test)


def main(test: Optional[List[str]] = None) -> None:
    args = parse_args(test)
    if args.quiet:
        import warnings

        warnings.filterwarnings("ignore")

    q = ""
    if args.query is not None:
        q = args.query
    elif args.file == "-":
        q = sys.stdin.read()
    else:
        q = open(args.file, "r").read()

    sparql = SPARQLWrapper(
        args.endpoint,
        agent=(
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/96.0.4664.110 Safari/537.36"
        ),
    )
    if args.auth is not None:
        sparql.setHTTPAuth(args.auth)
        sparql.setCredentials(args.username, args.password)
    if args.method is not None:
        sparql.setMethod(args.method)
    sparql.setQuery(q)
    sparql.setReturnFormat(args.format)
    results = sparql.query().convert()

    if isinstance(results, dict):
        # "json"
        print(json.dumps(results, indent=4))
    elif isinstance(results, xml.dom.minidom.Document):
        # "xml"
        print(results.toxml())
    elif isinstance(results, bytes):
        # "csv", "tsv", "turtle", "n3"
        print(results.decode("utf-8"))
    elif isinstance(results, rdflib.graph.ConjunctiveGraph):
        # "rdf"
        print(results.serialize())
    else:
        # unknown type
        raise TypeError(f"Unsupported result of type {type(results)}: {results!r}")


if __name__ == "__main__":
    main()