File: test-sourcekit-lsp.py

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (240 lines) | stat: -rw-r--r-- 7,881 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# Canary test for sourcekit-lsp, covering interaction with swiftpm and toolchain
# language services.

# REQUIRES: have-sourcekit-lsp

# Make a sandbox dir.
# RUN: rm -rf %t.dir
# RUN: mkdir -p %t.dir
# RUN: cp -r %S/pkg %t.dir/

# RUN: env SWIFTPM_ENABLE_CLANG_INDEX_STORE=1 %{swift-build} --package-path %t.dir/pkg -Xswiftc -index-ignore-system-modules -v 2>&1 | tee %t.build-log
# RUN: %{FileCheck} --check-prefix CHECK-BUILD-LOG --input-file %t.build-log %s
# CHECK-BUILD-LOG-NOT: error:

# RUN: %{python} -u %s %{sourcekit-lsp} %t.dir/pkg | tee %t.run-log
# RUN: %{FileCheck} --input-file %t.run-log %s

from typing import Dict
import argparse
import json
import subprocess
import sys
from pathlib import Path
import re


class LspConnection:
    def __init__(self, server_path: str):
        self.request_id = 0
        self.process = subprocess.Popen(
            [server_path],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            encoding="utf-8",
        )

    def send_data(self, dict: Dict[str, object]):
        """
        Encode the given dict as JSON and send it to the LSP server with the 'Content-Length' header.
        """
        assert self.process.stdin
        body = json.dumps(dict)
        data = "Content-Length: {}\r\n\r\n{}".format(len(body), body)
        self.process.stdin.write(data)
        self.process.stdin.flush()

    def read_message_from_lsp_server(self) -> str:
        """
        Read a single message sent from the LSP server to the client.
        This can be a request reply, notification or request sent from the server to the client.
        """
        assert self.process.stdout
        # Read Content-Length: 123\r\n
        # Note: Even though the Content-Length header ends with \r\n, `readline` returns it with a single \n.
        header = self.process.stdout.readline()
        match = re.match(r"Content-Length: ([0-9]+)\n$", header)
        assert match, f"Expected Content-Length header, got '{header}'"

        # The Content-Length header is followed by an empty line
        empty_line = self.process.stdout.readline()
        assert empty_line == "\n", f"Expected empty line, got '{empty_line}'"

        # Read the actual response
        return self.process.stdout.read(int(match.group(1)))

    def read_request_reply_from_lsp_server(self, request_id: int) -> str:
        """
        Read all messages sent from the LSP server until we see a request reply.
        Assert that this request reply was for the given request_id and return it.
        """
        message = self.read_message_from_lsp_server()
        message_obj = json.loads(message)
        if "result" not in message_obj:
            # We received a message that wasn't the request reply. 
            # Log it, ignore it and wait for the next message.
            print("Received message")
            print(message)
            return self.read_request_reply_from_lsp_server(request_id)
        # We always wait for a request reply before sending the next request.
        # If we received a request reply, it should thus have the request ID of the last request that we sent.
        assert (
            message_obj["id"] == self.request_id
        ), f"Expected response for request {self.request_id}, got '{message}'"
        return message

    def send_request(self, method: str, params: Dict[str, object]) -> str:
        """
        Send a request of the given method and parameters to the LSP server and wait for the response.
        """
        self.request_id += 1

        self.send_data(
            {
                "jsonrpc": "2.0",
                "id": self.request_id,
                "method": method,
                "params": params,
            }
        )

        return self.read_request_reply_from_lsp_server(self.request_id)

    def send_notification(self, method: str, params: Dict[str, object]):
        """
        Send a notification to the LSP server. There's nothing to wait for in response
        """
        self.send_data({"jsonrpc": "2.0", "method": method, "params": params})

    def wait_for_exit(self, timeout: int) -> int:
        """
        Wait for the LSP server to terminate.
        """
        return self.process.wait(timeout)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("sourcekit_lsp")
    parser.add_argument("package")
    args = parser.parse_args()

    package_dir = Path(args.package)
    main_swift = package_dir / "Sources" / "exec" / "main.swift"
    clib_c = package_dir / "Sources" / "clib" / "clib.c"

    connection = LspConnection(args.sourcekit_lsp)
    connection.send_request(
        "initialize",
        {
            "rootPath": args.package,
            "capabilities": {},
            "initializationOptions": {
                "listenToUnitEvents": False,
            },
        },
    )

    connection.send_notification(
        "textDocument/didOpen",
        {
            "textDocument": {
                "uri": f"file://{main_swift}",
                "languageId": "swift",
                "version": 0,
                "text": main_swift.read_text(),
            }
        },
    )

    connection.send_request("workspace/_pollIndex", {})
    foo_definition_response = connection.send_request(
        "textDocument/definition",
        {
            "textDocument": {"uri": f"file://{main_swift}"},
            "position": {"line": 3, "character": 6},  ## zero-based
        },
    )
    print("foo() definition response")
    # CHECK-LABEL: foo() definition response
    print(foo_definition_response)
    # CHECK: "result":[
    # CHECK-DAG: lib.swift
    # CHECK-DAG: "line":1
    # CHECK-DAG: "character":14
    # CHECK: ]

    clib_func_definition_response = connection.send_request(
        "textDocument/definition",
        {
            "textDocument": {"uri": f"file://{main_swift}"},
            "position": {"line": 4, "character": 0},  ## zero-based
        },
    )

    print("clib_func() definition response")
    # CHECK-LABEL: clib_func() definition response
    print(clib_func_definition_response)
    # CHECK: "result":[
    # CHECK-DAG: clib.c
    # CHECK-DAG: "line":2
    # CHECK-DAG: "character":5
    # CHECK: ]

    swift_completion_response = connection.send_request(
        "textDocument/completion",
        {
            "textDocument": {"uri": f"file://{main_swift}"},
            "position": {"line": 3, "character": 6},  ## zero-based
        },
    )
    print("Swift completion response")
    # CHECK-LABEL: Swift completion response
    print(swift_completion_response)
    # CHECK: "items":[
    # CHECK-DAG: "label":"foo()"
    # CHECK-DAG: "label":"self"
    # CHECK: ]

    connection.send_notification(
        "textDocument/didOpen",
        {
            "textDocument": {
                "uri": f"file://{clib_c}",
                "languageId": "c",
                "version": 0,
                "text": clib_c.read_text(),
            }
        },
    )

    c_completion_response = connection.send_request(
        "textDocument/completion",
        {
            "textDocument": {"uri": f"file://{clib_c}"},
            "position": {"line": 2, "character": 22},  ## zero-based
        },
    )
    print("C completion response")
    # CHECK-LABEL: C completion response
    print(c_completion_response)
    # CHECK: "items":[
    # CHECK-DAG: "insertText":"clib_func"
    # Missing "clib_other" from clangd on rebranch - rdar://73762053
    # DISABLED-DAG: "insertText":"clib_other"
    # CHECK: ]

    connection.send_request("shutdown", {})
    connection.send_notification("exit", {})

    return_code = connection.wait_for_exit(timeout=1)
    if return_code == 0:
        print("OK")
    else:
        print(f"error: sourcekit-lsp exited with code {return_code}")
        sys.exit(1)
    # CHECK: OK


if __name__ == "__main__":
    main()