File: _resolvers.py

package info (click to toggle)
python-lsp-server 1.12.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 796 kB
  • sloc: python: 7,791; sh: 12; makefile: 4
file content (132 lines) | stat: -rw-r--r-- 4,390 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright 2017-2020 Palantir Technologies, Inc.
# Copyright 2021- Python Language Server Contributors.

import logging
from collections import defaultdict
from time import time

from jedi.api.classes import Completion

from pylsp import lsp

log = logging.getLogger(__name__)


# ---- Base class
# -----------------------------------------------------------------------------
class Resolver:
    def __init__(self, callback, resolve_on_error, time_to_live=60 * 30) -> None:
        self.callback = callback
        self.resolve_on_error = resolve_on_error
        self._cache = {}
        self._time_to_live = time_to_live
        self._cache_ttl = defaultdict(set)
        self._clear_every = 2
        # see https://github.com/davidhalter/jedi/blob/master/jedi/inference/helpers.py#L194-L202
        self._cached_modules = {"pandas", "numpy", "tensorflow", "matplotlib"}

    @property
    def cached_modules(self):
        return self._cached_modules

    @cached_modules.setter
    def cached_modules(self, new_value):
        self._cached_modules = set(new_value)

    def clear_outdated(self) -> None:
        now = self.time_key()
        to_clear = [timestamp for timestamp in self._cache_ttl if timestamp < now]
        for time_key in to_clear:
            for key in self._cache_ttl[time_key]:
                del self._cache[key]
            del self._cache_ttl[time_key]

    def time_key(self):
        return int(time() / self._time_to_live)

    def get_or_create(self, completion: Completion):
        if not completion.full_name:
            use_cache = False
        else:
            module_parts = completion.full_name.split(".")
            use_cache = module_parts and module_parts[0] in self._cached_modules

        if use_cache:
            key = self._create_completion_id(completion)
            if key not in self._cache:
                if self.time_key() % self._clear_every == 0:
                    self.clear_outdated()

                self._cache[key] = self.resolve(completion)
                self._cache_ttl[self.time_key()].add(key)
            return self._cache[key]

        return self.resolve(completion)

    def _create_completion_id(self, completion: Completion):
        return (
            completion.full_name,
            completion.module_path,
            completion.line,
            completion.column,
            self.time_key(),
        )

    def resolve(self, completion):
        try:
            sig = completion.get_signatures()
            return self.callback(completion, sig)
        except Exception as e:
            log.warning(
                f"Something went wrong when resolving label for {completion}: {e}"
            )
            return self.resolve_on_error


# ---- Label resolver
# -----------------------------------------------------------------------------
def format_label(completion, sig):
    if sig and completion.type in ("function", "method"):
        params = ", ".join(param.name for param in sig[0].params)
        label = "{}({})".format(completion.name, params)
        return label
    return completion.name


LABEL_RESOLVER = Resolver(callback=format_label, resolve_on_error="")


# ---- Snippets resolver
# -----------------------------------------------------------------------------
def format_snippet(completion, sig):
    if not sig:
        return {}

    snippet_completion = {}

    positional_args = [
        param
        for param in sig[0].params
        if "=" not in param.description and param.name not in {"/", "*"}
    ]

    if len(positional_args) > 1:
        # For completions with params, we can generate a snippet instead
        snippet_completion["insertTextFormat"] = lsp.InsertTextFormat.Snippet
        snippet = completion.name + "("
        for i, param in enumerate(positional_args):
            snippet += "${%s:%s}" % (i + 1, param.name)
            if i < len(positional_args) - 1:
                snippet += ", "
        snippet += ")$0"
        snippet_completion["insertText"] = snippet
    elif len(positional_args) == 1:
        snippet_completion["insertTextFormat"] = lsp.InsertTextFormat.Snippet
        snippet_completion["insertText"] = completion.name + "($0)"
    else:
        snippet_completion["insertText"] = completion.name + "()"

    return snippet_completion


SNIPPET_RESOLVER = Resolver(callback=format_snippet, resolve_on_error={})