|
1 | | -import os |
2 | | -import sysconfig |
3 | 1 | from typing import Optional |
4 | 2 | from typing import Union |
5 | 3 |
|
|
8 | 6 | from ddtrace.appsec._iast._taint_tracking import get_ranges |
9 | 7 | from ddtrace.appsec._iast.sampling.vulnerability_detection import rollback_quota |
10 | 8 | from ddtrace.appsec._iast.sampling.vulnerability_detection import should_process_vulnerability |
11 | | -from ddtrace.appsec._shared._stacktrace import get_info_frame |
| 9 | +from ddtrace.appsec._patch_utils import get_caller_frame_info |
12 | 10 | from ddtrace.appsec._trace_utils import _asm_manual_keep |
13 | 11 | from ddtrace.internal import core |
14 | 12 | from ddtrace.internal.logger import get_logger |
|
28 | 26 |
|
29 | 27 | log = get_logger(__name__) |
30 | 28 |
|
31 | | -CWD = os.path.abspath(os.getcwd()) |
32 | | - |
33 | 29 | TEXT_TYPES = Union[str, bytes, bytearray] |
34 | 30 |
|
35 | | -PURELIB_PATH = sysconfig.get_path("purelib") |
36 | | -STDLIB_PATH = sysconfig.get_path("stdlib") |
37 | | - |
38 | 31 |
|
39 | 32 | class taint_sink_deduplication(deduplication): |
40 | 33 | def _check_deduplication(self): |
@@ -111,40 +104,6 @@ def _prepare_report( |
111 | 104 |
|
112 | 105 | return True |
113 | 106 |
|
114 | | - @classmethod |
115 | | - def _compute_file_line(cls) -> tuple[Optional[str], Optional[int], Optional[str], Optional[str]]: |
116 | | - file_name = line_number = function_name = class_name = None |
117 | | - frame_info = get_info_frame() |
118 | | - if not frame_info or frame_info[0] in ("", -1): |
119 | | - return file_name, line_number, function_name, class_name |
120 | | - |
121 | | - file_name, line_number, function_name, class_name = frame_info |
122 | | - if not file_name: |
123 | | - return None, None, None, None |
124 | | - |
125 | | - file_name = cls._rel_path(file_name) |
126 | | - if not file_name: |
127 | | - log.debug("Could not relativize vulnerability location path: %s", frame_info[0]) |
128 | | - return None, None, None, None |
129 | | - |
130 | | - return file_name, line_number, function_name, class_name |
131 | | - |
132 | | - @staticmethod |
133 | | - def _rel_path(file_name: str) -> str: |
134 | | - file_name_norm = file_name.replace("\\", "/") |
135 | | - if file_name_norm.startswith(PURELIB_PATH): |
136 | | - return os.path.relpath(file_name_norm, start=PURELIB_PATH) |
137 | | - |
138 | | - if file_name_norm.startswith(STDLIB_PATH): |
139 | | - return os.path.relpath(file_name_norm, start=STDLIB_PATH) |
140 | | - if file_name_norm.startswith(CWD): |
141 | | - return os.path.relpath(file_name_norm, start=CWD) |
142 | | - # If the path contains site-packages anywhere, return 'site-packages/<rest>' |
143 | | - # Normalize separators to forward slashes for consistency |
144 | | - if (idx := file_name_norm.find("/site-packages/")) != -1: |
145 | | - return file_name_norm[idx:] |
146 | | - return "" |
147 | | - |
148 | 107 | @classmethod |
149 | 108 | def _create_evidence_and_report( |
150 | 109 | cls, |
@@ -177,7 +136,7 @@ def report(cls, evidence_value: TEXT_TYPES = "", dialect: Optional[str] = None) |
177 | 136 | file_name = line_number = function_name = class_name = None |
178 | 137 |
|
179 | 138 | if not getattr(cls, "skip_location", False): |
180 | | - file_name, line_number, function_name, class_name = cls._compute_file_line() |
| 139 | + file_name, line_number, function_name, class_name = get_caller_frame_info() |
181 | 140 | if file_name is None: |
182 | 141 | rollback_quota(cls.vulnerability_type) |
183 | 142 | return result |
|
0 commit comments