-
Notifications
You must be signed in to change notification settings - Fork 396
Expand file tree
/
Copy paths3.rs
More file actions
102 lines (90 loc) · 3.02 KB
/
s3.rs
File metadata and controls
102 lines (90 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use aligned_sdk::common::types::VerificationData;
use tracing::{info, warn};
#[derive(Debug)]
#[allow(dead_code)]
pub enum GetBatchProofsError {
FetchingS3Batch(String),
Deserialization(String),
EmptyBody(String),
StatusFailed((u16, String)),
ReqwestClientFailed(String),
}
// needed to make S3 bucket work
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
const MAX_BATCH_URLS: usize = 5;
// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
urls: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
// Parse comma-separated URLs and limit to max 5
let parsed_urls = parse_batch_urls(&urls);
info!(
"Getting batch from data service with {} URLs: {:?}",
parsed_urls.len(),
parsed_urls
);
let mut errors = Vec::new();
// Try each URL until first successful response
for url in parsed_urls.iter() {
match get_aligned_batch_from_s3(url.clone()).await {
Ok(data) => {
return Ok(data);
}
Err(err) => {
warn!("Failed to fetch batch from URL {}: {:?}", url, err);
errors.push(format!("URL {}: {:?}", url, err));
}
}
}
// All URLs failed
Err(GetBatchProofsError::FetchingS3Batch(format!(
"Failed to get batch from all URLs, errors: {}",
errors.join("; ")
)))
}
// parse_batch_urls parses comma-separated URLs and limits to max 5
fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
let mut urls = Vec::new();
for url in batch_urls.split(',') {
let trimmed_url = url.trim();
if !trimmed_url.is_empty() {
urls.push(trimmed_url.to_string());
if urls.len() > MAX_BATCH_URLS {
break;
}
}
}
urls
}
pub async fn get_aligned_batch_from_s3(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
info!("Fetching batch from S3 URL: {}", url);
let client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()
.map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?;
let response = client
.get(url)
.send()
.await
.map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?;
if !response.status().is_success() {
return Err(GetBatchProofsError::StatusFailed((
response.status().as_u16(),
response
.status()
.canonical_reason()
.unwrap_or("")
.to_string(),
)));
}
let bytes = response
.bytes()
.await
.map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?;
let bytes: &[u8] = bytes.iter().as_slice();
let data: Vec<VerificationData> = ciborium::from_reader(bytes)
.map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?;
Ok(data)
}