forked from malariagen/malariagen-data-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsafe_query.py
More file actions
145 lines (123 loc) · 4.14 KB
/
safe_query.py
File metadata and controls
145 lines (123 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Safe query validation for pandas eval/query expressions.
This module provides AST-based validation of query strings to prevent
arbitrary code execution via pandas DataFrame.eval() and DataFrame.query().
Only a restricted subset of Python expressions is allowed:
- Boolean operators: and, or, not
- Comparison operators: ==, !=, <, <=, >, >=, in, not in, is
- Arithmetic operators: +, -, *, /, //, %, **
- Unary operators: +, -, ~, not
- Constants: strings, numbers, booleans, None
- Names: must match an allowlist of known column names (if provided)
- Parenthesized expressions
Forbidden constructs include:
- Function calls (e.g., __import__('os'))
- Attribute access (e.g., os.system)
- Subscript/indexing (e.g., x[0])
- Comprehensions, lambdas, f-strings, starred expressions
- Any identifier containing double underscores (__)
"""
import ast
from typing import Optional, Set
# AST node types that are safe in query expressions.
_SAFE_NODE_TYPES = (
ast.Expression,
ast.BoolOp,
ast.BinOp,
ast.UnaryOp,
ast.Compare,
ast.And,
ast.Or,
ast.Not,
ast.Add,
ast.Sub,
ast.Mult,
ast.Div,
ast.FloorDiv,
ast.Mod,
ast.Pow,
ast.USub,
ast.UAdd,
ast.Invert,
ast.Eq,
ast.NotEq,
ast.Lt,
ast.LtE,
ast.Gt,
ast.GtE,
ast.In,
ast.NotIn,
ast.Is,
ast.IsNot,
ast.Constant,
ast.Name,
ast.Load,
ast.Tuple,
ast.List,
)
class UnsafeQueryError(ValueError):
"""Raised when a query string contains unsafe constructs."""
pass
def _validate_node(node: ast.AST, allowed_names: Optional[Set[str]] = None) -> None:
"""Recursively validate that an AST node contains only safe constructs.
Parameters
----------
node : ast.AST
The AST node to validate.
allowed_names : set of str, optional
If provided, restrict identifier names to this set.
Raises
------
UnsafeQueryError
If the node or any of its children contain unsafe constructs.
"""
if not isinstance(node, _SAFE_NODE_TYPES):
raise UnsafeQueryError(
f"Unsafe expression: {type(node).__name__} nodes are not allowed "
f"in query strings. Only comparisons, boolean logic, and constants "
f"are permitted."
)
if isinstance(node, ast.Name):
name = node.id
# Block dunder identifiers.
if "__" in name:
raise UnsafeQueryError(
f"Unsafe expression: identifier '{name}' contains double "
f"underscores and is not allowed in query strings."
)
# Check against allowlist if provided.
if allowed_names is not None and name not in allowed_names:
# Allow common boolean literals that pandas recognizes.
if name not in {"True", "False", "None"}:
raise UnsafeQueryError(
f"Unknown column name '{name}' in query string. "
f"Allowed column names: {sorted(allowed_names)}"
)
# Recurse into child nodes.
for child in ast.iter_child_nodes(node):
_validate_node(child, allowed_names)
def validate_query(query: str, allowed_names: Optional[Set[str]] = None) -> None:
"""Validate that a query string is safe for use with pandas eval/query.
Parameters
----------
query : str
The query string to validate.
allowed_names : set of str, optional
If provided, restrict identifier names to this set of known column
names. If None, any identifier (except those containing ``__``) is
allowed.
Raises
------
UnsafeQueryError
If the query contains unsafe constructs such as function calls,
attribute access, or dunder identifiers.
"""
if not isinstance(query, str):
raise UnsafeQueryError(f"Query must be a string, got {type(query).__name__}.")
query = query.strip()
if not query:
raise UnsafeQueryError("Query string must not be empty.")
try:
tree = ast.parse(query, mode="eval")
except SyntaxError as e:
raise UnsafeQueryError(f"Query string is not a valid expression: {e}") from e
_validate_node(tree, allowed_names)