Skip to content

Commit a2ba510

Browse files
authored
Merge pull request #15 from ashutoshrana171/master
Create process for data before 2021-07-28
2 parents ced2ffc + c795cc5 commit a2ba510

File tree

1 file changed

+154
-0
lines changed

1 file changed

+154
-0
lines changed

process_20210728.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
from json import dumps
2+
from pathlib import Path
3+
from datetime import datetime, timedelta, timezone
4+
from os import environ
5+
from requests import post
6+
import threading
7+
import pandas as pd
8+
9+
URL = environ.get("REGALYTICS_API_BASE_URL", "https://api.regalytics.ai/api/v3")
10+
API_KEY = environ.get("REGALYTICS_API_KEY", "")
11+
12+
# objectives:# download data from API -> temp folder or in memory. Output processed datat to /temp-output-directory/alternative/regalytics/articles/yyyyMMdd.json
13+
ARTICLE_PATH = Path('/temp-output-directory/alternative/regalytics/articles')
14+
ARTICLE_PATH.mkdir(parents=True, exist_ok=True)
15+
articles_by_date = {}
16+
# using sourced_at at payload from 2020-01-01 (start of data) to 2021-07-27 as RegAlytics api has 'created_at' field for all articles mapped to 2021-07-28 (inception date of RegAlytics API)
17+
18+
start_date = datetime(2020,1,1)
19+
end_date = datetime(2021,7,27)
20+
none_sourced_at_date_count = 0
21+
22+
def get_data_from_source(process_date):
23+
24+
payload = dumps({
25+
"apikey": API_KEY,
26+
"search_options": {
27+
"sourced_at": {
28+
"start": process_date,
29+
"end": process_date
30+
}
31+
},
32+
})
33+
34+
def get_page(p):
35+
page = post(f"https://api.regalytics.ai/api/v3/search?page={p}", headers={ 'Content-Type': 'application/json' }, data=payload).json()
36+
all_responses.append(page)
37+
38+
page_1 = post(f"https://api.regalytics.ai/api/v3/search", headers={ 'Content-Type': 'application/json' }, data=payload).json()
39+
all_responses = [page_1]
40+
41+
if page_1['total_pages'] == 1:
42+
return page_1['total_results'], all_responses
43+
44+
threads = []
45+
46+
for p in range(2, page_1['total_pages'] + 1):
47+
threads.append(threading.Thread(target=get_page, args=(p,)))
48+
49+
for t in threads:
50+
t.start()
51+
52+
for t in threads:
53+
t.join()
54+
55+
all_responses.sort(key=lambda x: x['page_number'])
56+
57+
return page_1['total_results'], all_responses
58+
59+
# "agencies": [
60+
# {
61+
# "name": "Iowa Department of Human Services",
62+
# "state": [
63+
# {
64+
# "name": "Iowa"
65+
# }
66+
# ],
67+
# "country": [
68+
# {
69+
# "name": "United States"
70+
# }
71+
# ]
72+
# }
73+
# ]
74+
# if states is more than 0
75+
# loop into state and get the state name
76+
# 1. query all data, -> /api/v2/.../get-all; 2. look at latest_update, add delta of 1/2 days;
77+
# 3. write data to date of latest_update + delta. This date must be on the date we published the article on Regalytics
78+
79+
def process(process_date):
80+
global none_sourced_at_date_count
81+
total_results, all_responses = get_data_from_source(process_date)
82+
if total_results == 0: return
83+
84+
try:
85+
for response in all_responses:
86+
for article in response.get('results', []):
87+
article['in_federal_register'] = 'yes' in article['in_federal_register'].lower()
88+
# State -> Dictionary<string, List<string>>
89+
states = {}
90+
agencies = article.get('agencies', [])
91+
if not agencies:
92+
agencies = []
93+
for agency in agencies:
94+
state = agency.get('state')
95+
if not state:
96+
continue
97+
98+
for country in agency.get('country', []):
99+
name = country['name']
100+
101+
if not name in states:
102+
country_states = []
103+
states[name] = country_states
104+
else:
105+
country_states = states[name]
106+
107+
country_states.extend([x['name'] for x in state])
108+
109+
article['states'] = states
110+
article['agencies'] = [agency['name'] for agency in agencies]
111+
112+
# search using `created_at` returns all with UTC time between 00:00-23:59 in a single day,
113+
# so it include some articles created at 20:00-00:00 in EST of the "previous day" (-04:00).
114+
# Adjust timezone info of `created_at` field into UTC time to avoid overwriting the previous day file
115+
# changing the created_at for articles with a delta of 1 day addition to sourced_at data
116+
manipulated_sourced_at = f"{article['sourced_at'][:10]}T00:00:00+00:00"
117+
temp_sourced_at = datetime.fromisoformat(manipulated_sourced_at)
118+
temp_sourced_at+= pd.offsets.BDay()
119+
manipulated_sourced_at = temp_sourced_at.isoformat()
120+
print(manipulated_sourced_at)
121+
article['created_at'] = manipulated_sourced_at[:-3] + manipulated_sourced_at[-2:] # %z only accepts `-0400` instead of `-04:00` in Python3.6
122+
123+
try:
124+
created_at = datetime.strptime(article['created_at'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(timezone.utc)
125+
except:
126+
created_at = datetime.strptime(article['created_at'], '%Y-%m-%dT%H:%M:%S%z').astimezone(timezone.utc) # sourced_at sometimes have a different format
127+
128+
article['created_at'] = created_at.strftime('%Y-%m-%dT%H:%M:%S.%f')
129+
date_key = created_at.strftime('%Y%m%d')
130+
131+
if date_key not in articles_by_date:
132+
date_articles = []
133+
articles_by_date[date_key] = date_articles
134+
else:
135+
date_articles = articles_by_date[date_key]
136+
137+
date_articles.append(article)
138+
139+
if total_results!=len(date_articles):
140+
print(f"Data mismatch on {process_date}")
141+
142+
for date, articles in articles_by_date.items():
143+
with open(ARTICLE_PATH / f'{date}.json', 'w') as article_file:
144+
article_lines = '\n'.join([dumps(article, indent=None) for article in articles])
145+
article_file.write(article_lines)
146+
except Exception as e:
147+
print(f"Error {e} on {process_date}")
148+
149+
if __name__ == "__main__":
150+
current_date = start_date
151+
while current_date <= end_date:
152+
process_date = current_date
153+
process(process_date.strftime('%Y-%m-%d'))
154+
current_date += timedelta(days=1)

0 commit comments

Comments
 (0)