-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplunk_with_apisec.py
More file actions
159 lines (132 loc) · 5.35 KB
/
splunk_with_apisec.py
File metadata and controls
159 lines (132 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import requests
import json
import os
import logging
from datetime import datetime, timedelta
# Load config
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
with open(config_path, "r") as config_file:
config = json.load(config_file)
# Function to retrieve bearer token by logging in
def get_bearer_token():
login_url = "https://cloud.apisec.ai/auth/login"
credentials = {
"username": config['auth']['username'],
"password": config['auth']['password']
}
response = requests.post(login_url, json=credentials)
if response.status_code == 200:
token = response.json().get('token')
if token:
return token
else:
logging.error("Bearer token not found in the response.")
return None
else:
logging.error(f"Failed to retrieve bearer token: {response.status_code} - {response.text}")
return None
# Get the bearer token
BEARER_TOKEN = get_bearer_token()
# API base URL and Authorization from config
API_BASE_URL = "https://cloud.apisec.ai/api/v1"
# Common function to make API requests
def fetch_data(api_endpoint, params=None):
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}",
"Content-Type": "application/json"
}
response = requests.get(f"{API_BASE_URL}/{api_endpoint}", headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
logging.error(f"Error fetching {api_endpoint}: {response.status_code} - {response.text}")
return None
# Function to handle paginated requests using totalPages from response
def fetch_paginated_data(api_endpoint, params=None, pageSize=20):
page = 0
results = []
total_pages = None
while total_pages is None or page < total_pages:
# Add pagination to the params
if params is None:
params = {}
params['pageSize'] = pageSize
params['page'] = page
# Fetch the data for the current page
data = fetch_data(api_endpoint, params)
if not data:
break
# Add the data to results
results.append(data)
# Get totalPages from the first response
if total_pages is None:
total_pages = data.get("totalPages", 1) # Default to 1 page if not available
total_elements = data.get("totalElements", 0) # Just for logging or tracking
logging.info(f"Page {page} fetched {len(data)} records. Total pages: {total_pages}, Total elements: {total_elements}")
# Increment page number to fetch the next page
page += 1
return results
# Function to calculate date range (yesterday as start and end date)
def get_yesterday_date_range():
yesterday = datetime.now() - timedelta(days=1)
start_date = yesterday.strftime('%Y-%m-%d')
end_date = yesterday.strftime('%Y-%m-%d')
return start_date, end_date
# Function for fetching user activity logs
def fetch_user_activity_logs():
if config['user_activity_logs']['enabled']:
# Automatically calculate the startDate and endDate
start_date, end_date = get_yesterday_date_range()
# Fetch user activity logs with dynamic date range
params = {
"startDate": start_date,
"endDate": end_date
}
paginated_data = fetch_paginated_data("user-activity-logs", params)
if paginated_data:
print(json.dumps(paginated_data)) # Indexed as 'user_activity_logs'
# Function for fetching login activity logs
def fetch_login_activity_logs():
if config['login_activity_logs']['enabled']:
# Automatically calculate the startDate and endDate
start_date, end_date = get_yesterday_date_range()
# Fetch login activity logs with dynamic date range
params = {
"startDate": start_date,
"endDate": end_date
}
paginated_data = fetch_paginated_data("login-activity-logs", params)
if paginated_data:
print(json.dumps(paginated_data)) # Indexed as 'login_activity_logs'
# Function for fetching system activity logs
def fetch_system_activity_logs():
if config['system_activity_logs']['enabled']:
# Automatically calculate the startDate and endDate
start_date, end_date = get_yesterday_date_range()
# Fetch system activity logs with dynamic date range
params = {
"startDate": start_date,
"endDate": end_date
}
paginated_data = fetch_paginated_data("system-activity-logs", params)
if paginated_data:
print(json.dumps(paginated_data)) # Indexed as 'system_activity_logs'
# Function for fetching admin activity logs
def fetch_admin_activity_logs():
if config['admin_activity_logs']['enabled']:
# Automatically calculate the startDate and endDate
start_date, end_date = get_yesterday_date_range()
# Fetch admin activity logs with dynamic date range
params = {
"startDate": start_date,
"endDate": end_date
}
paginated_data = fetch_paginated_data("admin-activity-logs", params)
if paginated_data:
print(json.dumps(paginated_data)) # Indexed as 'admin_activity_logs'
if __name__ == "__main__":
# Fetch logs based on config
fetch_user_activity_logs()
fetch_login_activity_logs()
fetch_system_activity_logs()
fetch_admin_activity_logs()