forked from DonJayamanne/pythonVSCode
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathhelpers.py
More file actions
378 lines (321 loc) · 14.2 KB
/
helpers.py
File metadata and controls
378 lines (321 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import contextlib
import io
import json
import os
import pathlib
import select
import socket
import subprocess
import sys
import tempfile
import threading
import uuid
from typing import Any, Dict, List, Optional, Tuple
script_dir = pathlib.Path(__file__).parent.parent.parent
script_dir_child = pathlib.Path(__file__).parent.parent
sys.path.append(os.fspath(script_dir))
sys.path.append(os.fspath(script_dir_child))
sys.path.append(os.fspath(script_dir / "lib" / "python"))
print("sys add path", script_dir)
TEST_DATA_PATH = pathlib.Path(__file__).parent / ".data"
CONTENT_LENGTH: str = "Content-Length:"
CONTENT_TYPE: str = "Content-Type:"
@contextlib.contextmanager
def text_to_python_file(text_file_path: pathlib.Path):
"""Convert a text file to a python file and yield the python file path."""
python_file = None
try:
contents = text_file_path.read_text(encoding="utf-8")
python_file = text_file_path.with_suffix(".py")
python_file.write_text(contents, encoding="utf-8")
yield python_file
finally:
if python_file:
python_file.unlink()
@contextlib.contextmanager
def create_symlink(root: pathlib.Path, target_ext: str, destination_ext: str):
destination = None
try:
destination = root / destination_ext
target = root / target_ext
if destination and destination.exists():
print("destination already exists", destination)
try:
destination.symlink_to(target)
except Exception as e:
print("error occurred when attempting to create a symlink", e)
yield target, destination
finally:
if destination and destination.exists():
destination.unlink()
print("destination unlinked", destination)
def process_data_received(data: str) -> List[Dict[str, Any]]:
"""Process the all JSON data which comes from the server.
After listen is finished, this function will be called.
Here the data must be split into individual JSON messages and then parsed.
This function also:
- Checks that the jsonrpc value is 2.0
"""
json_messages = []
remaining = data
while remaining:
json_data, remaining = parse_rpc_message(remaining)
# here json_data is a single rpc payload, now check its jsonrpc 2 and save the param data
if "params" not in json_data or "jsonrpc" not in json_data:
raise ValueError("Invalid JSON-RPC message received, missing params or jsonrpc key")
elif json_data["jsonrpc"] != "2.0":
raise ValueError("Invalid JSON-RPC version received, not version 2.0")
else:
json_messages.append(json_data["params"])
return json_messages # return the list of json messages
def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
"""Process the JSON data which comes from the server.
A single rpc payload is in the format:
content-length: #LEN# \r\ncontent-type: application/json\r\n\r\n{"jsonrpc": "2.0", "params": ENTIRE_DATA}
returns:
json_data: A single rpc payload of JSON data from the server.
remaining: The remaining data after the JSON data.
"""
str_stream: io.StringIO = io.StringIO(data)
length: int = 0
while True:
line: str = str_stream.readline()
if CONTENT_LENGTH.lower() in line.lower():
length = int(line[len(CONTENT_LENGTH) :])
line: str = str_stream.readline()
if CONTENT_TYPE.lower() not in line.lower():
raise ValueError("Header does not contain Content-Type")
line = str_stream.readline()
if line not in ["\r\n", "\n"]:
raise ValueError("Header does not contain space to separate header and body")
# if it passes all these checks then it has the right headers
break
if not line or line.isspace():
raise ValueError("Header does not contain Content-Length")
while True: # keep reading until the number of bytes is the CONTENT_LENGTH
line: str = str_stream.readline(length)
try:
# try to parse the json, if successful it is single payload so return with remaining data
json_data: dict[str, str] = json.loads(line)
return json_data, str_stream.read()
except json.JSONDecodeError:
print("json decode error")
if sys.platform == "win32":
from namedpipe import NPopen
@contextlib.contextmanager
def pipe_setup_and_listen(pipe_name: str, result: List[str]):
# For Windows, named pipes have a specific naming convention.
pipe_path = f"\\\\.\\pipe\\{pipe_name}"
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
completed = threading.Event()
def listen():
all_data: list = []
stream = pipe.wait()
while True:
# Read data from collection
close = stream.closed
if close:
break
data = stream.readlines()
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
print("attempt another connection")
except socket.timeout:
# On timeout, append all collected data to result and return
# result.append("".join(all_data))
return
data_decoded = "".join(data)
all_data.append(data_decoded)
# Append all collected data to result array
result.append("".join(all_data))
thread = threading.Thread(target=listen)
thread.start()
try:
yield pipe_path
finally:
completed.set()
thread.join()
else:
@contextlib.contextmanager
def pipe_setup_and_listen(pipe_name: str, result: List[str]):
# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
pipe_path = pathlib.Path(
xdg_runtime_dir if xdg_runtime_dir else tempfile.gettempdir(),
pipe_name,
)
os.mkfifo(pipe_path)
completed = threading.Event()
def listen():
# When using blocking IO, open blocks forever if the subprocess compleates but never
# opens the pipe for writing (which may happen if there is an error early in the
# subprocess.) Hence we go to the effort of using non-blocking io so that we can
# break out of this function if that happens.
fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
try:
all_data = bytearray()
while True:
if completed.is_set():
break
# Wait till the pipe has data to read, with a timeout.
rlist, _, _ = select.select([fd], [], [], 0.1)
if rlist:
# Data is available, read it.
data = os.read(fd, 1024)
if not data:
# Empty data indicates EOF.
break
all_data.extend(data)
result.append(all_data.decode())
finally:
os.close(fd)
thread = threading.Thread(target=listen)
thread.start()
try:
yield pipe_path
finally:
completed.set()
thread.join()
def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]:
"""Run a subprocess and a named-pipe to listen for messages at the same time with threading."""
print("\n Running python test subprocess with cwd set to: ", TEST_DATA_PATH)
return runner_with_cwd(args, TEST_DATA_PATH)
def runner_with_cwd(args: List[str], path: pathlib.Path) -> Optional[List[Dict[str, Any]]]:
"""Run a subprocess and a named-pipe to listen for messages at the same time with threading."""
return runner_with_cwd_env(args, path, {})
def split_array_at_item(arr: List[str], item: str) -> Tuple[List[str], List[str]]:
"""
Splits an array into two subarrays at the specified item.
Args:
arr (List[str]): The array to be split.
item (str): The item at which to split the array.
Returns:
Tuple[List[str], List[str]]: A tuple containing two subarrays. The first subarray includes the item and all elements before it. The second subarray includes all elements after the item. If the item is not found, the first subarray is the original array and the second subarray is empty.
"""
if item in arr:
index = arr.index(item)
before = arr[: index + 1]
after = arr[index + 1 :]
return before, after
else:
return arr, []
def runner_with_cwd_env(
args: List[str], path: pathlib.Path, env_add: Dict[str, str]
) -> Optional[List[Dict[str, Any]]]:
"""
Run a subprocess and a named-pipe to listen for messages at the same time with threading.
Includes environment variables to add to the test environment.
"""
process_args: List[str]
pipe_name: str
if "MANAGE_PY_PATH" in env_add and "COVERAGE_ENABLED" not in env_add:
# If we are running Django, generate a unittest-specific pipe name.
process_args = [sys.executable, *args]
pipe_name = generate_random_pipe_name("unittest-discovery-test")
elif "_TEST_VAR_UNITTEST" in env_add:
before_args, after_ids = split_array_at_item(args, "*test*.py")
process_args = [sys.executable, *before_args]
pipe_name = generate_random_pipe_name("unittest-execution-test")
test_ids_pipe = os.fspath(
script_dir / "tests" / "unittestadapter" / ".data" / "coverage_ex" / "10943021.txt"
)
env_add.update({"RUN_TEST_IDS_PIPE": test_ids_pipe})
test_ids_arr = after_ids
with open(test_ids_pipe, "w") as f: # noqa: PTH123
f.write("\n".join(test_ids_arr))
else:
process_args = [sys.executable, "-m", "pytest", "-p", "vscode_pytest", "-s", *args]
pipe_name = generate_random_pipe_name("pytest-discovery-test")
if "COVERAGE_ENABLED" in env_add and "_TEST_VAR_UNITTEST" not in env_add:
if "_PYTEST_MANUAL_PLUGIN_LOAD" in env_add:
# Test manual plugin loading scenario for issue #25590
process_args = [
sys.executable,
"-m",
"pytest",
"--disable-plugin-autoload",
"-p",
"pytest_cov.plugin",
"-p",
"vscode_pytest",
"--cov=.",
"--cov-branch",
"-s",
*args,
]
else:
process_args = [
sys.executable,
"-m",
"pytest",
"-p",
"vscode_pytest",
"--cov=.",
"--cov-branch",
"-s",
*args,
]
result = [] # result is a string array to store the data during threading
with pipe_setup_and_listen(pipe_name, result) as pipe_path:
env = os.environ.copy()
env.update(
{
"TEST_RUN_PIPE": pipe_path,
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)
# if additional environment variables are passed, add them to the environment
if env_add:
env.update(env_add)
subprocess.run(process_args, env=env, cwd=path)
return process_data_received(result[0]) if result else None
def find_test_line_number(test_name: str, test_file_path) -> str:
"""Function which finds the correct line number for a test by looking for the "test_marker--[test_name]" string.
The test_name is split on the "[" character to remove the parameterization information.
Args:
test_name: The name of the test to find the line number for, will be unique per file.
test_file_path: The path to the test file where the test is located.
"""
test_file_unique_id: str = "test_marker--" + test_name.split("[")[0]
with open(test_file_path) as f: # noqa: PTH123
for i, line in enumerate(f):
if test_file_unique_id in line:
return str(i + 1)
error_str: str = f"Test {test_name!r} not found on any line in {test_file_path}"
raise ValueError(error_str)
def find_class_line_number(class_name: str, test_file_path) -> str:
"""Function which finds the correct line number for a class definition.
Args:
class_name: The name of the class to find the line number for.
test_file_path: The path to the test file where the class is located.
"""
# Look for the class definition line (or function for pytest-describe)
with open(test_file_path) as f: # noqa: PTH123
for i, line in enumerate(f):
# Match "class ClassName" or "class ClassName(" or "class ClassName:"
# Also match "def ClassName(" for pytest-describe blocks
if (
line.strip().startswith(f"class {class_name}")
or line.strip().startswith(f"class {class_name}(")
or line.strip().startswith(f"def {class_name}(")
):
return str(i + 1)
error_str: str = f"Class {class_name!r} not found on any line in {test_file_path}"
raise ValueError(error_str)
def get_absolute_test_id(test_id: str, test_path: pathlib.Path) -> str:
"""Get the absolute test id by joining the testPath with the test_id."""
split_id = test_id.split("::")[1:]
return "::".join([str(test_path), *split_id])
def generate_random_pipe_name(prefix=""):
# Generate a random suffix using UUID4, ensuring uniqueness.
random_suffix = uuid.uuid4().hex[:10]
# Default prefix if not provided.
if not prefix:
prefix = "python-ext-rpc"
return f"{prefix}-{random_suffix}"