-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_fetcher.py
More file actions
210 lines (176 loc) · 7.09 KB
/
data_fetcher.py
File metadata and controls
210 lines (176 loc) · 7.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import requests
import pandas as pd
def fetch_streamflow_data(state_code='TN'):
"""
Fetch streamflow data from the USGS NWIS service.
Parameters:
- state_code: State abbreviation (default is 'CA' for California)
Returns:
- List containing streamflow data.
"""
# Base URL for the NWIS API
base_url = "https://waterservices.usgs.gov/nwis/iv/"
# Parameters for the API request
params = {
'format': 'json', # Response format
'stateCd': state_code, # State Code
'siteStatus': 'active', # Only fetch active sites
'parameterCd': '00060', # Parameter code for streamflow
}
# Send the request to the USGS NWIS service
response = requests.get(base_url, params=params)
# Check if the request was successful
if response.status_code == 200:
data = response.json()
return data
else:
raise Exception(f"Error fetching data: {response.status_code} - {response.text}")
def fetch_sites_by_bbox(min_lon, min_lat, max_lon, max_lat):
"""
Fetch active streamflow sites within a bounding box.
Parameters:
- min_lon, min_lat, max_lon, max_lat: Coordinates defining the bounding box.
Returns:
- List of sites with metadata.
"""
base_url = "https://waterservices.usgs.gov/nwis/iv/"
params = {
'format': 'json',
'bBox': f"{min_lon:.6f},{min_lat:.6f},{max_lon:.6f},{max_lat:.6f}",
'siteStatus': 'active',
'parameterCd': '00060'
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data = response.json()
sites = []
if 'value' in data and 'timeSeries' in data['value']:
# Use a set to track unique site codes to avoid duplicates if the API returns them
seen_sites = set()
for series in data['value']['timeSeries']:
source_info = series.get('sourceInfo', {})
site_code = source_info.get('siteCode', [{}])[0].get('value')
if site_code and site_code not in seen_sites:
site_name = source_info.get('siteName', 'Unknown Site')
geo_loc = source_info.get('geoLocation', {}).get('geogLocation', {})
lat = geo_loc.get('latitude')
lon = geo_loc.get('longitude')
if lat and lon:
sites.append({
'name': site_name,
'code': site_code,
'lat': lat,
'lon': lon
})
seen_sites.add(site_code)
return sites
else:
# It's possible no sites are found or the box is too big/small, just return empty list or log error
print(f"Error fetching sites by bbox: {response.status_code} - {response.text}")
return []
def fetch_historical_streamflow_data(site_number, start_date, end_date):
"""
Fetch historical streamflow data from the USGS NWIS service.
Parameters:
- site_number: USGS site number for the streamgage.
- start_date: Start date in the format 'YYYY-MM-DD'.
- end_date: End date in the format 'YYYY-MM-DD'.
Returns:
- DataFrame containing historical streamflow data.
"""
base_url = "https://waterservices.usgs.gov/nwis/dv/"
# Parameters for the API request
params = {
'format': 'json',
'sites': site_number,
'startDT': start_date,
'endDT': end_date,
'parameterCd': '00060',
'siteStatus': 'active'
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data = response.json()
# Parse the JSON data into a DataFrame
time_series = data['value']['timeSeries']
records = []
# How the json is formatted inside each series
"""
"values": [
{
"value": [
{
"value": "335",
"qualifiers": [
"A"
],
"dateTime": "2020-01-01T00:00:00.000"
},
"""
for series in time_series:
values = series['values'][0]["value"]
for occurrence in values:
record = {
'date_time': occurrence["dateTime"],
'streamflow_cfs': occurrence["value"]
}
records.append(record)
df = pd.DataFrame(records)
if len(records) != 0:
df['date_time'] = pd.to_datetime(df['date_time'])
else:
# Ensure columns exist even if empty
df = pd.DataFrame(columns=['date_time', 'streamflow_cfs'])
return df
else:
raise Exception(f"Error fetching historical data: {response.status_code} - {response.text}")
def fetch_realtime_streamflow_data(site_number, lookback_days=7):
"""
Fetch real-time instantaneous streamflow data from the USGS NWIS service.
Parameters:
- site_number: USGS site number for the streamgage.
- lookback_days: Number of days to look back for data.
Returns:
- DataFrame containing instantaneous streamflow data.
"""
base_url = "https://waterservices.usgs.gov/nwis/iv/"
# Calculate period in ISO8601 duration format (e.g., P7D)
period = f"P{lookback_days}D"
# Parameters for the API request
params = {
'format': 'json',
'sites': site_number,
'period': period,
'parameterCd': '00060', # Streamflow
'siteStatus': 'active'
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data = response.json()
if 'value' not in data or 'timeSeries' not in data['value'] or not data['value']['timeSeries']:
return pd.DataFrame(columns=['date_time', 'streamflow_cfs'])
time_series = data['value']['timeSeries']
records = []
for series in time_series:
# Check if values exist
if not series['values'] or not series['values'][0]['value']:
continue
values = series['values'][0]["value"]
for occurrence in values:
record = {
'date_time': occurrence["dateTime"],
'streamflow_cfs': occurrence["value"]
}
records.append(record)
df = pd.DataFrame(records)
if not df.empty:
df['date_time'] = pd.to_datetime(df['date_time'], utc=True)
# Remove timezone awareness for consistency with other parts of the app if needed
# but keep it if the model expects it. Looking at feature_engineering.py,
# it just uses pd.to_datetime.
df['date_time'] = df['date_time'].dt.tz_localize(None)
else:
df = pd.DataFrame(columns=['date_time', 'streamflow_cfs'])
return df
else:
raise Exception(f"Error fetching real-time data: {response.status_code} - {response.text}")