-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathpython-ssrf.mdc
More file actions
520 lines (424 loc) · 24.5 KB
/
python-ssrf.mdc
File metadata and controls
520 lines (424 loc) · 24.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
---
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
globs: *.py
alwaysApply: false
---
# Python Server-Side Request Forgery (SSRF) Standards (OWASP A10:2021)
This rule enforces security best practices to prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications, as defined in OWASP Top 10:2021-A10.
<rule>
name: python_ssrf
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
filters:
- type: file_extension
pattern: "\\.py$"
- type: file_path
pattern: ".*"
actions:
- type: enforce
conditions:
# Pattern 1: Detect direct use of requests library with user input
- pattern: "requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in HTTP requests. Implement URL validation and allowlisting."
# Pattern 2: Detect urllib usage with user input
- pattern: "urllib\\.(request|parse)\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in urllib functions. Implement URL validation and allowlisting."
# Pattern 3: Detect http.client usage with user input
- pattern: "http\\.client\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in http.client functions. Implement URL validation and allowlisting."
# Pattern 4: Detect aiohttp usage with user input
- pattern: "aiohttp\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in aiohttp functions. Implement URL validation and allowlisting."
# Pattern 5: Detect httpx usage with user input
- pattern: "httpx\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in httpx functions. Implement URL validation and allowlisting."
# Pattern 6: Detect pycurl usage with user input
- pattern: "pycurl\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in pycurl functions. Implement URL validation and allowlisting."
# Pattern 7: Detect subprocess calls with user input that might lead to SSRF
- pattern: "subprocess\\.(Popen|call|run|check_output|check_call)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in subprocess calls, which might lead to SSRF. Validate and sanitize input."
# Pattern 8: Detect os.system calls with user input that might lead to SSRF
- pattern: "os\\.(system|popen|spawn)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in OS commands, which might lead to SSRF. Validate and sanitize input."
# Pattern 9: Detect URL construction with user input
- pattern: "(f|r)[\"\']https?://[^\"\']*?\\{[^\\}]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL construction. Implement URL validation and allowlisting."
# Pattern 10: Detect URL joining with user input
- pattern: "urljoin\\([^,]+,[^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL joining. Implement URL validation and allowlisting."
# Pattern 11: Detect file opening with user input (potential local SSRF)
- pattern: "open\\([^,]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential local SSRF vulnerability detected. User-controlled input is being used in file operations. Validate file paths and use path sanitization."
# Pattern 12: Detect XML/YAML parsing with user input (potential XXE leading to SSRF)
- pattern: "(ET\\.fromstring|ET\\.parse|ET\\.XML|minidom\\.parse|parseString|yaml\\.load)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential XXE vulnerability that could lead to SSRF detected. User-controlled input is being used in XML/YAML parsing. Use safe parsing methods and disable external entities."
# Pattern 13: Detect socket connections with user input
- pattern: "socket\\.(socket|create_connection)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in socket connections. Implement host/port validation and allowlisting."
# Pattern 14: Detect FTP connections with user input
- pattern: "ftplib\\.FTP\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in FTP connections. Implement host validation and allowlisting."
# Pattern 15: Detect missing URL validation before making requests
- pattern: "def\\s+\\w+\\([^)]*?\\):[^\\n]*?\\n(?:[^\\n]*?\\n)*?[^\\n]*?requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?url\\s*=\\s*[^\\n]*?(?!.*?validate_url)"
message: "Missing URL validation before making HTTP requests. Implement URL validation with allowlisting to prevent SSRF attacks."
- type: suggest
message: |
**Python Server-Side Request Forgery (SSRF) Prevention Best Practices:**
1. **URL Validation and Allowlisting:**
- Implement strict URL validation
- Use allowlists for domains, IP ranges, and protocols
- Example implementation:
```python
import re
import socket
import ipaddress
from urllib.parse import urlparse
def is_valid_url(url, allowed_domains=None, allowed_protocols=None, block_private_ips=True):
"""
Validate URLs against allowlists and block private IPs.
Args:
url (str): The URL to validate
allowed_domains (list): List of allowed domains
allowed_protocols (list): List of allowed protocols
block_private_ips (bool): Whether to block private IPs
Returns:
bool: True if URL is valid according to rules
"""
if not url:
return False
# Default allowlists if none provided
if allowed_domains is None:
allowed_domains = ["example.com", "api.example.com"]
if allowed_protocols is None:
allowed_protocols = ["https"]
try:
# Parse URL
parsed_url = urlparse(url)
# Check protocol
if parsed_url.scheme not in allowed_protocols:
return False
# Check domain against allowlist
if parsed_url.netloc not in allowed_domains:
return False
# Block private IPs if enabled
if block_private_ips:
hostname = parsed_url.netloc.split(':')[0]
try:
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
return False
except socket.gaierror:
# DNS resolution failed
return False
return True
except Exception:
return False
# Usage example
def fetch_resource(resource_url):
if not is_valid_url(resource_url):
raise ValueError("Invalid or disallowed URL")
# Proceed with request
import requests
return requests.get(resource_url)
```
2. **Implement Network-Level Controls:**
- Use network-level allowlists
- Configure firewalls to block outbound requests to internal resources
- Example with proxy configuration:
```python
import requests
def safe_request(url):
# Configure proxy that implements URL filtering
proxies = {
'http': 'http://ssrf-protecting-proxy:8080',
'https': 'http://ssrf-protecting-proxy:8080'
}
# Set timeout to prevent long-running requests
timeout = 10
try:
return requests.get(url, proxies=proxies, timeout=timeout)
except requests.exceptions.RequestException as e:
# Log the error and handle gracefully
logging.error(f"Request failed: {e}")
return None
```
3. **Use Safe Libraries and Wrappers:**
- Create wrapper functions for HTTP requests
- Implement consistent security controls
- Example wrapper:
```python
import requests
from urllib.parse import urlparse
class SafeRequestHandler:
def __init__(self, allowed_domains=None, allowed_protocols=None):
self.allowed_domains = allowed_domains or ["api.example.com"]
self.allowed_protocols = allowed_protocols or ["https"]
def validate_url(self, url):
parsed_url = urlparse(url)
# Validate protocol
if parsed_url.scheme not in self.allowed_protocols:
return False
# Validate domain
if parsed_url.netloc not in self.allowed_domains:
return False
return True
def request(self, method, url, **kwargs):
if not self.validate_url(url):
raise ValueError(f"URL validation failed for: {url}")
# Set sensible defaults
kwargs.setdefault('timeout', 10)
# Make the request
return requests.request(method, url, **kwargs)
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)
# Usage
safe_requests = SafeRequestHandler()
response = safe_requests.get('https://api.example.com/data')
```
4. **Disable Redirects or Implement Redirect Validation:**
- Disable automatic redirects
- Validate each redirect location
- Example:
```python
import requests
def safe_request_with_redirect_validation(url, allowed_domains):
# Disable automatic redirects
session = requests.Session()
response = session.get(url, allow_redirects=False)
# Handle redirects manually with validation
redirect_count = 0
max_redirects = 5
while 300 <= response.status_code < 400 and redirect_count < max_redirects:
redirect_url = response.headers.get('Location')
# Validate redirect URL
parsed_url = urlparse(redirect_url)
if parsed_url.netloc not in allowed_domains:
raise ValueError(f"Redirect to disallowed domain: {parsed_url.netloc}")
# Follow the redirect with validation
redirect_count += 1
response = session.get(redirect_url, allow_redirects=False)
return response
```
5. **Use Metadata Instead of Direct URLs:**
- Use resource identifiers instead of URLs
- Resolve identifiers server-side
- Example:
```python
def fetch_resource_by_id(resource_id):
# Map of allowed resources
resource_map = {
"user_profile": "https://api.example.com/profiles/",
"product_data": "https://api.example.com/products/",
"weather_info": "https://api.weather.com/forecast/"
}
# Check if resource_id is in allowed list
if resource_id not in resource_map:
raise ValueError(f"Unknown resource ID: {resource_id}")
# Construct URL from safe base + ID
base_url = resource_map[resource_id]
return requests.get(base_url)
```
6. **Implement Response Handling Controls:**
- Sanitize and validate responses
- Prevent response data from being used in further requests
- Example:
```python
def safe_request_with_response_validation(url):
response = requests.get(url)
# Check response size
if len(response.content) > MAX_RESPONSE_SIZE:
raise ValueError("Response too large")
# Validate content type
content_type = response.headers.get('Content-Type', '')
if not content_type.startswith('application/json'):
raise ValueError(f"Unexpected content type: {content_type}")
# Parse and validate JSON structure
try:
data = response.json()
# Validate expected structure
if 'result' not in data:
raise ValueError("Invalid response structure")
return data
except ValueError:
raise ValueError("Invalid JSON response")
```
7. **Use Timeouts and Circuit Breakers:**
- Set appropriate timeouts
- Implement circuit breakers for failing services
- Example:
```python
import requests
from requests.exceptions import Timeout, ConnectionError
def request_with_circuit_breaker(url, max_retries=3, timeout=5):
retries = 0
while retries < max_retries:
try:
return requests.get(url, timeout=timeout)
except (Timeout, ConnectionError) as e:
retries += 1
if retries >= max_retries:
# Circuit is now open
raise ValueError(f"Circuit breaker open for {url}: {str(e)}")
# Exponential backoff
time.sleep(2 ** retries)
```
8. **Implement Proper Logging and Monitoring:**
- Log all outbound requests
- Monitor for unusual patterns
- Example:
```python
import logging
import requests
def logged_request(url, **kwargs):
# Log the outbound request
logging.info(f"Outbound request to: {url}")
try:
response = requests.get(url, **kwargs)
# Log the response
logging.info(f"Response from {url}: status={response.status_code}")
return response
except Exception as e:
# Log the error
logging.error(f"Request to {url} failed: {str(e)}")
raise
```
9. **Use DNS Resolution Controls:**
- Implement DNS resolution controls
- Block internal DNS names
- Example:
```python
import socket
import ipaddress
def is_safe_host(hostname):
try:
# Resolve hostname to IP
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
# Check if IP is private/internal
if (ip_obj.is_private or ip_obj.is_loopback or
ip_obj.is_link_local or ip_obj.is_reserved):
return False
return True
except (socket.gaierror, ValueError):
return False
def safe_request_with_dns_check(url):
parsed_url = urlparse(url)
hostname = parsed_url.netloc.split(':')[0]
if not is_safe_host(hostname):
raise ValueError(f"Hostname resolves to unsafe IP: {hostname}")
return requests.get(url)
```
10. **Implement Defense in Depth:**
- Combine multiple protection mechanisms
- Don't rely on a single control
- Example comprehensive approach:
```python
class SSRFProtectedClient:
def __init__(self):
self.allowed_domains = ["api.example.com", "cdn.example.com"]
self.allowed_protocols = ["https"]
self.max_redirects = 3
self.timeout = 10
def is_safe_url(self, url):
# URL validation
parsed_url = urlparse(url)
# Protocol check
if parsed_url.scheme not in self.allowed_protocols:
return False
# Domain check
if parsed_url.netloc not in self.allowed_domains:
return False
# DNS resolution check
hostname = parsed_url.netloc.split(':')[0]
try:
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
return False
except socket.gaierror:
return False
return True
def request(self, method, url, **kwargs):
# Validate URL
if not self.is_safe_url(url):
raise ValueError(f"URL failed security validation: {url}")
# Set sensible defaults
kwargs.setdefault('timeout', self.timeout)
kwargs.setdefault('allow_redirects', False)
# Make initial request
session = requests.Session()
response = session.request(method, url, **kwargs)
# Handle redirects manually with validation
redirect_count = 0
while 300 <= response.status_code < 400 and redirect_count < self.max_redirects:
redirect_url = response.headers.get('Location')
# Validate redirect URL
if not self.is_safe_url(redirect_url):
raise ValueError(f"Redirect URL failed security validation: {redirect_url}")
# Follow the redirect with validation
redirect_count += 1
response = session.request(method, redirect_url, **kwargs)
# Log the request
logging.info(f"{method} request to {url} completed with status {response.status_code}")
return response
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)
# Usage
client = SSRFProtectedClient()
response = client.get('https://api.example.com/data')
```
- type: validate
conditions:
# Check 1: URL validation implementation
- pattern: "def\\s+is_valid_url|def\\s+validate_url"
message: "URL validation function is implemented."
# Check 2: Allowlist implementation
- pattern: "allowed_domains|allowed_urls|ALLOWED_HOSTS|whitelist"
message: "URL allowlisting is implemented."
# Check 3: Safe request wrapper
- pattern: "class\\s+\\w+Request|def\\s+safe_request"
message: "Safe request wrapper is implemented."
# Check 4: IP address validation
- pattern: "ipaddress\\.ip_address|is_private|is_loopback|is_reserved"
message: "IP address validation is implemented to prevent access to internal resources."
metadata:
priority: high
version: 1.0
tags:
- security
- python
- ssrf
- owasp
- language:python
- framework:django
- framework:flask
- framework:fastapi
- category:security
- subcategory:ssrf
- standard:owasp-top10
- risk:a10-server-side-request-forgery
references:
- "https://owasp.org/Top10/A10_2021-Server-Side_Request_Forgery_%28SSRF%29/"
- "https://cheatsheetseries.owasp.org/cheatsheets/Server_Side_Request_Forgery_Prevention_Cheat_Sheet.html"
- "https://portswigger.net/web-security/ssrf"
- "https://docs.python.org/3/library/urllib.request.html"
- "https://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification"
- "https://docs.python.org/3/library/ipaddress.html"
</rule>