Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ verify_aggregated_proof_risc0:
--rpc_url $(RPC_URL)

proof_aggregator_install: ## Install the aggregation mode with proving enabled
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator --locked
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator_gpu --locked

proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids
@cd aggregation_mode && ./scripts/build_programs.sh
Expand Down
28 changes: 26 additions & 2 deletions aggregation_mode/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aggregation_mode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reqwest = { version = "0.12" }
ciborium = "=0.2.2"
lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]}
rayon = "1.10.0"
backon = "1.2.0"
# Necessary for the VerificationData type
aligned-sdk = { path = "../crates/sdk/" }
# zkvms
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod config;
pub mod fetcher;
mod merkle_tree;
mod retry;
mod s3;
mod types;

Expand Down
62 changes: 62 additions & 0 deletions aggregation_mode/src/backend/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use backon::ExponentialBuilder;
use backon::Retryable;
use std::{future::Future, time::Duration};

#[derive(Debug)]
pub enum RetryError<E> {
Transient(E),
Permanent(E),
}

impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
RetryError::Transient(e) => write!(f, "{}", e),
RetryError::Permanent(e) => write!(f, "{}", e),
}
}
}

impl<E> RetryError<E> {
pub fn inner(self) -> E {
match self {
RetryError::Transient(e) => e,
RetryError::Permanent(e) => e,
}
}
}

impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}

/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
/// Runs with `jitter: false`.
///
/// # Parameters
/// * `function` - The async function to retry
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
/// * `factor` - Exponential backoff multiplier for retry delays
/// * `max_times` - Maximum number of retry attempts
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
pub async fn retry_function<FutureFn, Fut, T, E>(
function: FutureFn,
min_delay_millis: u64,
factor: f32,
max_times: usize,
max_delay_seconds: u64,
) -> Result<T, RetryError<E>>
where
Fut: Future<Output = Result<T, RetryError<E>>>,
FutureFn: FnMut() -> Fut,
{
let backoff = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(min_delay_millis))
.with_max_times(max_times)
.with_factor(factor)
.with_max_delay(Duration::from_secs(max_delay_seconds));

function
.retry(backoff)
.sleep(tokio::time::sleep)
.when(|e| matches!(e, RetryError::Transient(_)))
.await
}
102 changes: 80 additions & 22 deletions aggregation_mode/src/backend/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::backend::retry::{retry_function, RetryError};
use aligned_sdk::common::types::VerificationData;
use std::time::Duration;
use tracing::{info, warn};

#[derive(Debug)]
#[allow(dead_code)]
Expand All @@ -13,38 +16,93 @@ pub enum GetBatchProofsError {
// needed to make S3 bucket work
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";

pub async fn get_aligned_batch_from_s3(
// Retry parameters for S3 requests
/// Initial delay before first retry attempt (in milliseconds)
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
/// Exponential backoff multiplier for retry delays
const RETRY_FACTOR: f32 = 2.0;
/// Maximum number of retry attempts
const RETRY_MAX_TIMES: usize = 5;
/// Maximum delay between retry attempts (in seconds)
const RETRY_MAX_DELAY_SECONDS: u64 = 10;

/// Timeout for establishing a connection to S3
const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10);
/// Timeout for Batch Download Requests
const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60);

async fn get_aligned_batch_from_s3_retryable(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
) -> Result<Vec<VerificationData>, RetryError<GetBatchProofsError>> {
info!("Fetching batch from S3 URL: {}", url);
let client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.connect_timeout(CONNECT_TIMEOUT_SECONDS)
.timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS)
.build()
.map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?;
.map_err(|e| {
RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string()))
})?;

let response = client.get(&url).send().await.map_err(|e| {
warn!("Failed to send request to {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string()))
})?;

let response = client
.get(url)
.send()
.await
.map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?;
if !response.status().is_success() {
return Err(GetBatchProofsError::StatusFailed((
response.status().as_u16(),
response
.status()
.canonical_reason()
.unwrap_or("")
.to_string(),
)));
let status_code = response.status().as_u16();
let reason = response
.status()
.canonical_reason()
.unwrap_or("")
.to_string();

// Determine if the error is retryable based on status code
let error = GetBatchProofsError::StatusFailed((status_code, reason));
return match status_code {
// Client errors (4xx) are generally permanent, except for specific cases
400..=499 => match status_code {
408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests
_ => Err(RetryError::Permanent(error)),
},
// Server errors (5xx) are generally transient
500..=599 => Err(RetryError::Transient(error)),
_ => Err(RetryError::Permanent(error)),
};
}

let bytes = response
.bytes()
.await
.map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?;
let bytes = response.bytes().await.map_err(|e| {
warn!("Failed to read response body from {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string()))
})?;
let bytes: &[u8] = bytes.iter().as_slice();

let data: Vec<VerificationData> = ciborium::from_reader(bytes)
.map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?;
let data: Vec<VerificationData> = ciborium::from_reader(bytes).map_err(|e| {
warn!("Failed to deserialize batch data from {}: {}", url, e);
RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string()))
})?;

Ok(data)
}

/// Download batch from Storage Service using the provided URL.
///
/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
pub async fn get_aligned_batch_from_s3(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
let url_clone = url.clone();
retry_function(
move || {
let url = url_clone.clone();
get_aligned_batch_from_s3_retryable(url)
},
RETRY_MIN_DELAY_MILLIS,
RETRY_FACTOR,
RETRY_MAX_TIMES,
RETRY_MAX_DELAY_SECONDS,
)
.await
.map_err(|retry_err| retry_err.inner())
}
Loading