Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ crates/cli/batch_inclusion_responses/*
**/aligned_verification_data
**/broadcast
volume
volume2
config-files/*.last_processed_batch.json
config-files/*.last_aggregated_block.json

Expand Down
17 changes: 9 additions & 8 deletions aggregation_mode/src/backend/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
risc0_aggregator::Risc0ProofReceiptAndImageId, sp1_aggregator::SP1ProofWithPubValuesAndElf,
AlignedProof, ZKVMEngine,
},
backend::s3::get_aligned_batch_from_s3,
backend::s3::get_aligned_batch_from_s3_with_multiple_urls,
};
use aligned_sdk::common::types::ProvingSystemId;
use alloy::{
Expand Down Expand Up @@ -97,13 +97,14 @@ impl ProofsFetcher {
);

// Download batch proofs from s3
let data = match get_aligned_batch_from_s3(batch.batchDataPointer).await {
Ok(data) => data,
Err(err) => {
error!("Error while downloading proofs from s3. Err {:?}", err);
continue;
}
};
let data =
match get_aligned_batch_from_s3_with_multiple_urls(batch.batchDataPointer).await {
Ok(data) => data,
Err(err) => {
error!("Error while downloading proofs from s3. Err {:?}", err);
continue;
}
};

info!("Data downloaded from S3, number of proofs {}", data.len());

Expand Down
52 changes: 52 additions & 0 deletions aggregation_mode/src/backend/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use aligned_sdk::common::types::VerificationData;
use tracing::{info, warn};

#[derive(Debug)]
#[allow(dead_code)]
Expand All @@ -12,10 +13,61 @@ pub enum GetBatchProofsError {

// needed to make S3 bucket work
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
const MAX_BATCH_URLS: usize = 5;

// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
urls: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
// Parse comma-separated URLs and limit to max 5
let parsed_urls = parse_batch_urls(&urls);
info!(
"Getting batch from data service with {} URLs: {:?}",
parsed_urls.len(),
parsed_urls
);

let mut errors = Vec::new();

// Try each URL until first successful response
for url in parsed_urls.iter() {
match get_aligned_batch_from_s3(url.clone()).await {
Ok(data) => {
return Ok(data);
}
Err(err) => {
warn!("Failed to fetch batch from URL {}: {:?}", url, err);
errors.push(format!("URL {}: {:?}", url, err));
}
}
}

// All URLs failed
Err(GetBatchProofsError::FetchingS3Batch(format!(
"Failed to get batch from all URLs, errors: {}",
errors.join("; ")
)))
}

// parse_batch_urls parses comma-separated URLs and limits to max 5
fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
let mut urls = Vec::new();
for url in batch_urls.split(',') {
let trimmed_url = url.trim();
if !trimmed_url.is_empty() {
urls.push(trimmed_url.to_string());
if urls.len() > MAX_BATCH_URLS {
break;
}
}
}
urls
}

pub async fn get_aligned_batch_from_s3(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
info!("Fetching batch from S3 URL: {}", url);
let client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()
Expand Down
10 changes: 10 additions & 0 deletions crates/batcher/.env.dev
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# Main S3 bucket configuration
AWS_SECRET_ACCESS_KEY=test
AWS_REGION=us-east-2
AWS_ACCESS_KEY_ID=test
AWS_BUCKET_NAME=aligned.storage
UPLOAD_ENDPOINT=http://localhost:4566
DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage

# Secondary S3 bucket configuration
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
AWS_REGION_SECONDARY=us-west-1
AWS_ACCESS_KEY_ID_SECONDARY=test2
AWS_BUCKET_NAME_SECONDARY=aligned.storage
UPLOAD_ENDPOINT_SECONDARY=http://localhost:4567
DOWNLOAD_ENDPOINT_SECONDARY=http://localhost:4567/aligned.storage

RUST_LOG=info
RUST_BACKTRACE=1
10 changes: 10 additions & 0 deletions crates/batcher/.env.docker
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# Main S3 bucket configuration
AWS_SECRET_ACCESS_KEY=test
AWS_REGION=us-east-2
AWS_ACCESS_KEY_ID=test
AWS_BUCKET_NAME=aligned.storage
UPLOAD_ENDPOINT=http://localstack:4566
DOWNLOAD_ENDPOINT=http://localstack:4566/aligned.storage

# Secondary S3 bucket configuration
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
AWS_REGION_SECONDARY=us-west-1
AWS_ACCESS_KEY_ID_SECONDARY=test2
AWS_BUCKET_NAME_SECONDARY=aligned.storage
UPLOAD_ENDPOINT_SECONDARY=http://localstack2:4567
DOWNLOAD_ENDPOINT_SECONDARY=http://localstack2:4567/aligned.storage

RUST_LOG=info
RUST_BACKTRACE=1
10 changes: 10 additions & 0 deletions crates/batcher/.env.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
# Main S3 bucket configuration
AWS_SECRET_ACCESS_KEY=<secret_access_key>
AWS_REGION=<region>
AWS_ACCESS_KEY_ID=<access_key_id>
AWS_BUCKET_NAME=<bucket_name>
UPLOAD_ENDPOINT=<upload_endpoint. For non-aws storage>
DOWNLOAD_ENDPOINT=<download_endpoint>

# Secondary S3 bucket configuration
AWS_SECRET_ACCESS_KEY_SECONDARY=<secondary_secret_access_key>
AWS_REGION_SECONDARY=<secondary_region>
AWS_ACCESS_KEY_ID_SECONDARY=<secondary_access_key_id>
AWS_BUCKET_NAME_SECONDARY=<secondary_bucket_name>
UPLOAD_ENDPOINT_SECONDARY=<secondary_upload_endpoint. For non-aws storage>
DOWNLOAD_ENDPOINT_SECONDARY=<secondary_download_endpoint>

RUST_LOG=<log_level> # info, debug, error, warn, etc.
125 changes: 110 additions & 15 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct Batcher {
s3_client: S3Client,
s3_bucket_name: String,
download_endpoint: String,
s3_client_secondary: S3Client,
s3_bucket_name_secondary: String,
download_endpoint_secondary: String,
eth_ws_url: String,
eth_ws_url_fallback: String,
batcher_signer: Arc<SignerMiddlewareT>,
Expand Down Expand Up @@ -106,15 +109,36 @@ impl Batcher {
dotenv().ok();

// https://docs.aws.amazon.com/sdk-for-rust/latest/dg/localstack.html
let upload_endpoint = env::var("UPLOAD_ENDPOINT").ok();
// Primary S3 configuration
let s3_config_primary = s3::S3Config {
access_key_id: env::var("AWS_ACCESS_KEY_ID").ok(),
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").ok(),
region: env::var("AWS_REGION").ok(),
endpoint_url: env::var("UPLOAD_ENDPOINT").ok(),
};

let s3_bucket_name =
env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME not found in environment");

let download_endpoint =
env::var("DOWNLOAD_ENDPOINT").expect("DOWNLOAD_ENDPOINT not found in environment");

let s3_client = s3::create_client(upload_endpoint).await;
let s3_client = s3::create_client(s3_config_primary).await;

// Secondary S3 configuration
let s3_config_secondary = s3::S3Config {
access_key_id: env::var("AWS_ACCESS_KEY_ID_SECONDARY").ok(),
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY_SECONDARY").ok(),
region: env::var("AWS_REGION_SECONDARY").ok(),
endpoint_url: env::var("UPLOAD_ENDPOINT_SECONDARY").ok(),
};

let s3_bucket_name_secondary = env::var("AWS_BUCKET_NAME_SECONDARY")
.expect("AWS_BUCKET_NAME_SECONDARY not found in environment");
let download_endpoint_secondary = env::var("DOWNLOAD_ENDPOINT_SECONDARY")
.expect("DOWNLOAD_ENDPOINT_SECONDARY not found in environment");

let s3_client_secondary = s3::create_client(s3_config_secondary).await;

let config = ConfigFromYaml::new(config_file);
// Ensure max_batch_bytes_size can at least hold one proof of max_proof_size,
Expand Down Expand Up @@ -252,6 +276,9 @@ impl Batcher {
s3_client,
s3_bucket_name,
download_endpoint,
s3_client_secondary,
s3_bucket_name_secondary,
download_endpoint_secondary,
eth_ws_url: config.eth_ws_url,
eth_ws_url_fallback: config.eth_ws_url_fallback,
batcher_signer,
Expand Down Expand Up @@ -1541,7 +1568,18 @@ impl Batcher {
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
let file_name = batch_merkle_root_hex.clone() + ".json";
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;

let batch_data_pointer = self
.upload_batch_to_multiple_s3(batch_bytes, &file_name)
.await?;
if let Err(e) = self
.telemetry
.task_uploaded_to_s3(&batch_merkle_root_hex)
.await
{
warn!("Failed to send task status to telemetry: {:?}", e);
};
info!("Batch upload to: {}", batch_data_pointer);

let num_proofs_in_batch = leaves.len();
let gas_per_proof = (self.constant_gas_cost()
Expand Down Expand Up @@ -1577,16 +1615,6 @@ impl Batcher {
.gas_price_used_on_latest_batch
.set(gas_price.as_u64() as i64);

info!("Uploading batch to S3...");
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
if let Err(e) = self
.telemetry
.task_uploaded_to_s3(&batch_merkle_root_hex)
.await
{
warn!("Failed to send task status to telemetry: {:?}", e);
};
info!("Batch sent to S3 with name: {}", file_name);
if let Err(e) = self
.telemetry
.task_created(
Expand Down Expand Up @@ -1857,22 +1885,89 @@ impl Batcher {
unlocked
}

/// Uploads the batch to both S3 buckets and returns the comma-separated URLs of successful uploads.
/// Returns an error only if all uploads fail.
async fn upload_batch_to_multiple_s3(
&self,
batch_bytes: &[u8],
file_name: &str,
) -> Result<String, BatcherError> {
// Upload to both S3 buckets and collect successful URLs
let mut successful_urls = Vec::new();

// Try primary S3 upload
if self
.upload_batch_to_s3(
&self.s3_client,
batch_bytes,
file_name,
&self.s3_bucket_name,
)
.await
.is_ok()
{
let primary_url = format!("{}/{}", self.download_endpoint, file_name);
successful_urls.push(primary_url.clone());
info!("Successfully uploaded batch to primary S3: {}", primary_url);
} else {
warn!("Failed to upload batch to primary S3");
}

// Try secondary S3 upload
if self
.upload_batch_to_s3(
&self.s3_client_secondary,
batch_bytes,
file_name,
&self.s3_bucket_name_secondary,
)
.await
.is_ok()
{
let secondary_url = format!("{}/{}", self.download_endpoint_secondary, file_name);
successful_urls.push(secondary_url.clone());
info!(
"Successfully uploaded batch to secondary S3: {}",
secondary_url
);
} else {
warn!("Failed to upload batch to secondary S3");
}

// Update metrics with number of available data services
self.metrics
.available_data_services
.set(successful_urls.len() as i64);

// If no uploads succeeded, return error
if successful_urls.is_empty() {
error!("Failed to upload batch to both S3 buckets");
return Err(BatcherError::BatchUploadError(
"Failed to upload to any S3 bucket".to_string(),
));
}

Ok(successful_urls.join(","))
}

/// Uploads the batch to s3.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn upload_batch_to_s3(
&self,
s3_client: &S3Client,
batch_bytes: &[u8],
file_name: &str,
bucket_name: &str,
) -> Result<(), BatcherError> {
let start = Instant::now();
let result = retry_function(
|| {
Self::upload_batch_to_s3_retryable(
batch_bytes,
file_name,
self.s3_client.clone(),
&self.s3_bucket_name,
s3_client.clone(),
bucket_name,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
Expand Down
7 changes: 7 additions & 0 deletions crates/batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct BatcherMetrics {
pub cancel_create_new_task_duration: IntGauge,
pub batcher_gas_cost_create_task_total: GenericCounter<AtomicF64>,
pub batcher_gas_cost_cancel_task_total: GenericCounter<AtomicF64>,
pub available_data_services: IntGauge,
}

impl BatcherMetrics {
Expand Down Expand Up @@ -79,6 +80,10 @@ impl BatcherMetrics {
"batcher_gas_cost_cancel_task_total",
"Batcher Gas Cost Cancel Task Total"
))?;
let available_data_services = register_int_gauge!(opts!(
"available_data_services",
"Number of available data services (0-2)"
))?;

registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
Expand All @@ -96,6 +101,7 @@ impl BatcherMetrics {
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
registry.register(Box::new(batcher_gas_cost_create_task_total.clone()))?;
registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?;
registry.register(Box::new(available_data_services.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
Expand Down Expand Up @@ -124,6 +130,7 @@ impl BatcherMetrics {
cancel_create_new_task_duration,
batcher_gas_cost_create_task_total,
batcher_gas_cost_cancel_task_total,
available_data_services,
})
}

Expand Down
Loading
Loading