-
Notifications
You must be signed in to change notification settings - Fork 396
Expand file tree
/
Copy paths3.rs
More file actions
108 lines (96 loc) · 3.84 KB
/
s3.rs
File metadata and controls
108 lines (96 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::backend::retry::{retry_function, RetryError};
use aligned_sdk::common::types::VerificationData;
use std::time::Duration;
use tracing::{info, warn};
#[derive(Debug)]
#[allow(dead_code)]
pub enum GetBatchProofsError {
FetchingS3Batch(String),
Deserialization(String),
EmptyBody(String),
StatusFailed((u16, String)),
ReqwestClientFailed(String),
}
// needed to make S3 bucket work
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
// Retry parameters for S3 requests
/// Initial delay before first retry attempt (in milliseconds)
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
/// Exponential backoff multiplier for retry delays
const RETRY_FACTOR: f32 = 2.0;
/// Maximum number of retry attempts
const RETRY_MAX_TIMES: usize = 5;
/// Maximum delay between retry attempts (in seconds)
const RETRY_MAX_DELAY_SECONDS: u64 = 10;
/// Timeout for establishing a connection to S3
const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10);
/// Timeout for Batch Download Requests
const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60);
async fn get_aligned_batch_from_s3_retryable(
url: String,
) -> Result<Vec<VerificationData>, RetryError<GetBatchProofsError>> {
info!("Fetching batch from S3 URL: {}", url);
let client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.connect_timeout(CONNECT_TIMEOUT_SECONDS)
.timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS)
.build()
.map_err(|e| {
RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string()))
})?;
let response = client.get(&url).send().await.map_err(|e| {
warn!("Failed to send request to {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string()))
})?;
if !response.status().is_success() {
let status_code = response.status().as_u16();
let reason = response
.status()
.canonical_reason()
.unwrap_or("")
.to_string();
// Determine if the error is retryable based on status code
let error = GetBatchProofsError::StatusFailed((status_code, reason));
return match status_code {
// Client errors (4xx) are generally permanent, except for specific cases
400..=499 => match status_code {
408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests
_ => Err(RetryError::Permanent(error)),
},
// Server errors (5xx) are generally transient
500..=599 => Err(RetryError::Transient(error)),
_ => Err(RetryError::Permanent(error)),
};
}
let bytes = response.bytes().await.map_err(|e| {
warn!("Failed to read response body from {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string()))
})?;
let bytes: &[u8] = bytes.iter().as_slice();
let data: Vec<VerificationData> = ciborium::from_reader(bytes).map_err(|e| {
warn!("Failed to deserialize batch data from {}: {}", url, e);
RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string()))
})?;
Ok(data)
}
/// Download batch from Storage Service using the provided URL.
///
/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
pub async fn get_aligned_batch_from_s3(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
let url_clone = url.clone();
retry_function(
move || {
let url = url_clone.clone();
get_aligned_batch_from_s3_retryable(url)
},
RETRY_MIN_DELAY_MILLIS,
RETRY_FACTOR,
RETRY_MAX_TIMES,
RETRY_MAX_DELAY_SECONDS,
)
.await
.map_err(|retry_err| retry_err.inner())
}