diff --git a/Makefile b/Makefile index ee26968cfc..449e1c4528 100644 --- a/Makefile +++ b/Makefile @@ -688,7 +688,7 @@ BURST_TIME_SECS ?= 3 task_sender_generate_gnark_groth16_proofs: @cd crates/task-sender && \ cargo run --release -- generate-proofs \ - --number-of-proofs $(NUMBER_OF_PROOFS) --proof-type gnark_groth16 \ + --number-of-proofs $(NUMBER_OF_PROOFS) --proof-type groth16 \ --dir-to-save-proofs $(CURDIR)/scripts/test_files/task_sender/proofs # ===== DEVNET ===== diff --git a/crates/batcher/.env.dev b/crates/batcher/.env.dev index 40ede80935..e0d1615f4e 100644 --- a/crates/batcher/.env.dev +++ b/crates/batcher/.env.dev @@ -4,5 +4,6 @@ AWS_ACCESS_KEY_ID=test AWS_BUCKET_NAME=aligned.storage UPLOAD_ENDPOINT=http://localhost:4566 DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage -RUST_LOG=info +# Override default logs filter +# RUST_LOG=info RUST_BACKTRACE=1 diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index a3d0a670e3..5e25096af8 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -46,7 +46,7 @@ use lambdaworks_crypto::merkle_tree::merkle::MerkleTree; use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend; use log::{debug, error, info, warn}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, MutexGuard, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tokio_tungstenite::tungstenite::{Error, Message}; use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority}; use types::errors::{BatcherError, TransactionSendError}; @@ -84,7 +84,22 @@ pub struct Batcher { payment_service_fallback: BatcherPaymentService, service_manager: ServiceManager, service_manager_fallback: ServiceManager, + /// Shared state containing both the user data and the proofs queue. + /// + /// This state is accessed concurrently by the proof messages handler + /// and the batches builder. Any mutation typically consists of: + /// 1. Adding and/or removing a proof from the queue. + /// 2. Updating the corresponding user state based on 1. + /// + /// The `batch_state` lock MUST be held for the full duration of both steps + /// to ensure consistency in the state. batch_state: Mutex, + /// A map of per-user mutexes. + /// + /// It allows us to synchronize the processing of proof messages + /// per user ensuring the state is mutated atomically between each user, + /// while avoiding the need to lock the entire [`batch_state`] structure. + user_proof_processing_mutexes: Mutex>>>, min_block_interval: u64, transaction_wait_timeout: u64, max_proof_size: usize, @@ -93,7 +108,6 @@ pub struct Batcher { last_uploaded_batch_block: Mutex, pre_verification_is_enabled: bool, non_paying_config: Option, - posting_batch: Mutex, disabled_verifiers: Mutex, aggregator_fee_percentage_multiplier: u128, aggregator_gas_cost: u128, @@ -273,8 +287,8 @@ impl Batcher { .batcher .aggregator_fee_percentage_multiplier, aggregator_gas_cost: config.batcher.aggregator_gas_cost, - posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), + user_proof_processing_mutexes: Mutex::new(HashMap::new()), disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, @@ -411,11 +425,9 @@ impl Batcher { } info!("Received new block: {}", block_number); - tokio::spawn(async move { - if let Err(e) = batcher.handle_new_block(block_number).await { - error!("Error when handling new block: {:?}", e); - } - }); + if let Err(e) = batcher.handle_new_block(block_number).await { + error!("Error when handling new block: {:?}", e); + } } error!("Both main and fallback Ethereum WS clients subscriptions have disconnected, will try to reconnect..."); @@ -662,46 +674,6 @@ impl Batcher { nonced_verification_data = aux_verification_data } - // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients - if self.pre_verification_is_enabled { - let verification_data = &nonced_verification_data.verification_data; - if self - .is_verifier_disabled(verification_data.proving_system) - .await - { - warn!( - "Verifier for proving system {} is disabled, skipping verification", - verification_data.proving_system - ); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( - verification_data.proving_system, - )), - ) - .await; - self.metrics.user_error(&[ - "disabled_verifier", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - - if !zk_utils::verify(verification_data).await { - error!("Invalid proof detected. Verification failed"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), - ) - .await; - self.metrics.user_error(&[ - "rejected_proof", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - } - info!("Handling message"); // We don't need a batch state lock here, since if the user locks its funds @@ -715,11 +687,10 @@ impl Batcher { // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - let is_user_in_state: bool; - { + let is_user_in_state: bool = { let batch_state_lock = self.batch_state.lock().await; - is_user_in_state = batch_state_lock.user_states.contains_key(&addr); - } + batch_state_lock.user_states.contains_key(&addr) + }; if !is_user_in_state { let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await { @@ -760,40 +731,44 @@ impl Batcher { return Ok(()); }; - // For now on until the message is fully processed, the batch state is locked + // For now on until the message is fully processed, the corresponding user mutex is acquired from `user_proof_processing_mutexes` // This is needed because we need to query the user state to make validations and - // finally add the proof to the batch queue. - - let mut batch_state_lock = self.batch_state.lock().await; + // finally add the proof to the batch queue which must be done individually. + // This allows us to process each user without having to lock the whole `batch_state` + debug!("Trying to acquire user mutex for {:?}...", addr); + let user_mutex = { + let mut map = self.user_proof_processing_mutexes.lock().await; + map.entry(addr) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + let _user_mutex = user_mutex.lock().await; + debug!("User mutex for {:?} acquired...", addr_in_msg); let msg_max_fee = nonced_verification_data.max_fee; - let Some(user_last_max_fee_limit) = - batch_state_lock.get_user_last_max_fee_limit(&addr).await - else { - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); + let (user_last_max_fee_limit, user_accumulated_fee) = { + let batch_state_lock = self.batch_state.lock().await; + let last_max_fee = batch_state_lock.get_user_last_max_fee_limit(&addr).await; + let accumulated_fee = batch_state_lock.get_user_total_fees_in_queue(&addr).await; + (last_max_fee, accumulated_fee) }; - let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await - else { - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; + let (user_last_max_fee_limit, user_accumulated_fee) = + match (user_last_max_fee_limit, user_accumulated_fee) { + (Some(last_max), Some(accumulated)) => (last_max, accumulated), + _ => { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + } + }; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InsufficientBalance(addr), @@ -803,11 +778,10 @@ impl Batcher { return Ok(()); } - let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await; + let cached_user_nonce = self.batch_state.lock().await.get_user_nonce(&addr).await; let Some(expected_nonce) = cached_user_nonce else { error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted"); - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, @@ -818,7 +792,6 @@ impl Batcher { }; if expected_nonce < msg_nonce { - std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}"); send_message( ws_conn_sink.clone(), @@ -834,7 +807,6 @@ impl Batcher { if expected_nonce > msg_nonce { info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}"); self.handle_replacement_message( - batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), client_msg.signature, @@ -848,7 +820,6 @@ impl Batcher { // We check this after replacement logic because if user wants to replace a proof, their // new_max_fee must be greater or equal than old_max_fee if msg_max_fee > user_last_max_fee_limit { - std::mem::drop(batch_state_lock); warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -859,11 +830,34 @@ impl Batcher { return Ok(()); } + // * ---------------------------------------------------* + // * Validate proof * + // * ---------------------------------------------------* + + // Note: While it may seem obvious to run this before `handle_replacement_message` and avoid repeating code + // We intentionally do not run this verification before `handle_replacement_message` + // because this function is "expensive" and it may block for a few milliseconds. + // + // When handling replacement message, before running the verification we make sure + // the user has bumped the fee enough so that running the pre-verification is justified. + if let Err(e) = self + .verify_proof(&nonced_verification_data.verification_data) + .await + { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(e), + ) + .await; + return Ok(()); + }; + // * ---------------------------------------------------------------------* // * Perform validation over batcher queue * // * ---------------------------------------------------------------------* - if batch_state_lock.is_queue_full() { + if self.batch_state.lock().await.is_queue_full() { + let mut batch_state_lock = self.batch_state.lock().await; debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry."); // This cannot panic, if the batch queue is full it has at least one item @@ -898,6 +892,7 @@ impl Batcher { batch_state_lock.update_user_state_on_entry_removal(&removed_entry); if let Some(removed_entry_ws) = removed_entry.messaging_sink { + std::mem::drop(batch_state_lock); send_message( removed_entry_ws, SubmitProofResponseMessage::UnderpricedProof, @@ -923,10 +918,8 @@ impl Batcher { // * ---------------------------------------------------------------------* // * Add message data into the queue and update user state * // * ---------------------------------------------------------------------* - if let Err(e) = self .add_to_batch( - batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), signature, @@ -969,7 +962,6 @@ impl Batcher { /// Returns true if the message was replaced in the batch, false otherwise async fn handle_replacement_message( &self, - mut batch_state_lock: MutexGuard<'_, BatchState>, nonced_verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, signature: Signature, @@ -977,8 +969,13 @@ impl Batcher { ) { let replacement_max_fee = nonced_verification_data.max_fee; let nonce = nonced_verification_data.nonce; - let Some(entry) = batch_state_lock.get_entry(addr, nonce) else { - std::mem::drop(batch_state_lock); + let Some(entry) = self + .batch_state + .lock() + .await + .get_entry(addr, nonce) + .cloned() + else { warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found"); send_message( ws_conn_sink.clone(), @@ -991,7 +988,6 @@ impl Batcher { let original_max_fee = entry.nonced_verification_data.max_fee; if original_max_fee > replacement_max_fee { - std::mem::drop(batch_state_lock); warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -1003,6 +999,19 @@ impl Batcher { return; } + // If all went well, verify the proof + if let Err(e) = self + .verify_proof(&nonced_verification_data.verification_data) + .await + { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(e), + ) + .await; + return; + }; + info!("Replacing message for address {addr} with nonce {nonce} and max fee {replacement_max_fee}"); // The replacement entry is built from the old entry and validated for then to be replaced @@ -1030,8 +1039,12 @@ impl Batcher { } replacement_entry.messaging_sink = Some(ws_conn_sink.clone()); - if !batch_state_lock.replacement_entry_is_valid(&replacement_entry) { - std::mem::drop(batch_state_lock); + if !self + .batch_state + .lock() + .await + .replacement_entry_is_valid(&replacement_entry) + { warn!("Invalid replacement message"); send_message( ws_conn_sink.clone(), @@ -1052,7 +1065,26 @@ impl Batcher { // note that the entries are considered equal for the priority queue // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry - batch_state_lock.batch_queue.remove(&replacement_entry); + // + // Note: If `remove` returns `None`, it means the proof was already removed— + // likely by the batch building process running in parallel. + // This is rare but expected due to concurrent access. + let mut batch_state_lock = self.batch_state.lock().await; + if batch_state_lock + .batch_queue + .remove(&replacement_entry) + .is_none() + { + std::mem::drop(batch_state_lock); + warn!("Replacement entry for {addr:?} was not present in batcher queue"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidReplacementMessage, + ) + .await; + return; + }; + batch_state_lock.batch_queue.push( replacement_entry.clone(), BatchQueueEntryPriority::new(replacement_max_fee, nonce), @@ -1128,10 +1160,9 @@ impl Batcher { .await } - /// Adds verification data to the current batch queue. + /// Adds verification data to the current batch queue and updates the user state async fn add_to_batch( &self, - mut batch_state_lock: MutexGuard<'_, BatchState>, verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, proof_submitter_sig: Signature, @@ -1143,6 +1174,7 @@ impl Batcher { let max_fee = verification_data.max_fee; let nonce = verification_data.nonce; + let mut batch_state_lock = self.batch_state.lock().await; batch_state_lock.batch_queue.push( BatchQueueEntry::new( verification_data, @@ -1225,10 +1257,7 @@ impl Batcher { block_number: u64, gas_price: U256, ) -> Option> { - let batch_state_lock = self.batch_state.lock().await; - let current_batch_len = batch_state_lock.batch_queue.len(); - let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await; - + let current_batch_len = self.batch_state.lock().await.batch_queue.len(); if current_batch_len < 1 { info!( "Current batch has {} proofs. Waiting for more proofs...", @@ -1237,6 +1266,7 @@ impl Batcher { return None; } + let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await; if block_number < *last_uploaded_batch_block_lock + self.min_block_interval { info!( "Current batch not ready to be posted. Minimium amount of {} blocks have not passed. Block passed: {}", self.min_block_interval, @@ -1245,18 +1275,11 @@ impl Batcher { return None; } - // Check if a batch is currently being posted - let mut batch_posting = self.posting_batch.lock().await; - if *batch_posting { - info!( - "Batch is currently being posted. Waiting for the current batch to be finalized..." - ); - return None; - } - - // Set the batch posting flag to true - *batch_posting = true; - let batch_queue_copy = batch_state_lock.batch_queue.clone(); + info!("Trying to build a new batch..."); + let batch_queue_copy = { + let batch_state_lock = self.batch_state.lock().await; + batch_state_lock.batch_queue.clone() + }; let finalized_batch = batch_queue::try_build_batch( batch_queue_copy, gas_price, @@ -1265,7 +1288,6 @@ impl Batcher { self.constant_gas_cost(), ) .inspect_err(|e| { - *batch_posting = false; match e { // We can't post a batch since users are not willing to pay the needed fee, wait for more proofs BatcherError::BatchCostTooHigh => { @@ -1349,7 +1371,6 @@ impl Batcher { /// The last uploaded batch block is updated once the task is created in Aligned. async fn finalize_batch( &self, - block_number: u64, finalized_batch: Vec, gas_price: U256, ) -> Result<(), BatcherError> { @@ -1381,16 +1402,6 @@ impl Batcher { ) })?; - { - let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await; - // update last uploaded batch block - *last_uploaded_batch_block = block_number; - info!( - "Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked", - block_number - ); - } - let leaves: Vec<[u8; 32]> = batch_data_comm .iter() .map(VerificationCommitmentBatch::hash_data) @@ -1405,7 +1416,7 @@ impl Batcher { } // Here we submit the batch on-chain - if let Err(e) = self + match self .submit_batch( &batch_bytes, &batch_merkle_tree.root, @@ -1415,30 +1426,41 @@ impl Batcher { ) .await { - let reason = format!("{:?}", e); - if let Err(e) = self - .telemetry - .task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason) - .await - { - warn!("Failed to send task status to telemetry: {:?}", e); + Ok(block_number) => { + let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await; + // update last uploaded batch block + *last_uploaded_batch_block = block_number; + info!( + "Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked", + block_number + ); } - - // decide if i want to flush the queue: - match e { - BatcherError::TransactionSendError( - TransactionSendError::SubmissionInsufficientBalance, - ) => { - // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch - // this would also need a message sent to the clients - self.flush_queue_and_clear_nonce_cache().await; + Err(e) => { + let reason = format!("{:?}", e); + if let Err(e) = self + .telemetry + .task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason) + .await + { + warn!("Failed to send task status to telemetry: {:?}", e); } - _ => { - // Add more cases here if we want in the future + + // decide if i want to flush the queue: + match e { + BatcherError::TransactionSendError( + TransactionSendError::SubmissionInsufficientBalance, + ) => { + // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch + // this would also need a message sent to the clients + self.flush_queue_and_clear_nonce_cache().await; + } + _ => { + // Add more cases here if we want in the future + } } - } - return Err(e); + return Err(e); + } }; // Once the submit is succesfull, we remove the submitted proofs from the queue @@ -1516,13 +1538,9 @@ impl Batcher { if let Some(finalized_batch) = self.is_batch_ready(block_number, modified_gas_price).await { let batch_finalization_result = self - .finalize_batch(block_number, finalized_batch, modified_gas_price) + .finalize_batch(finalized_batch, modified_gas_price) .await; - // Resetting this here to avoid doing it on every return path of `finalize_batch` function - let mut batch_posting = self.posting_batch.lock().await; - *batch_posting = false; - batch_finalization_result?; } @@ -1537,7 +1555,7 @@ impl Batcher { leaves: Vec<[u8; 32]>, finalized_batch: &[BatchQueueEntry], gas_price: U256, - ) -> Result<(), BatcherError> { + ) -> Result { let batch_merkle_root_hex = hex::encode(batch_merkle_root); info!("Batch merkle root: 0x{}", batch_merkle_root_hex); let file_name = batch_merkle_root_hex.clone() + ".json"; @@ -1609,10 +1627,10 @@ impl Batcher { ) .await { - Ok(_) => { + Ok(res) => { info!("Batch verification task created on Aligned contract"); self.metrics.sent_batches.inc(); - Ok(()) + Ok(res.block_number.map(|e| e.as_u64()).unwrap_or_default()) } Err(e) => { error!("Failed to send batch to contract: {:?}", e); @@ -2040,4 +2058,41 @@ impl Batcher { true } + + /// Takes [`VerificationData`] and spawns a blocking task to verify the proof + async fn verify_proof( + &self, + verification_data: &VerificationData, + ) -> Result<(), ProofInvalidReason> { + if !self.pre_verification_is_enabled { + return Ok(()); + } + if self + .is_verifier_disabled(verification_data.proving_system) + .await + { + warn!( + "Verifier for proving system {} is disabled, skipping verification", + verification_data.proving_system + ); + self.metrics.user_error(&[ + "disabled_verifier", + &format!("{}", verification_data.proving_system), + ]); + return Err(ProofInvalidReason::DisabledVerifier( + verification_data.proving_system, + )); + } + + if !zk_utils::verify(verification_data).await { + error!("Invalid proof detected. Verification failed"); + self.metrics.user_error(&[ + "rejected_proof", + &format!("{}", verification_data.proving_system), + ]); + return Err(ProofInvalidReason::RejectedProof); + } + + Ok(()) + } } diff --git a/crates/batcher/src/main.rs b/crates/batcher/src/main.rs index 4cdcad3566..4594292b89 100644 --- a/crates/batcher/src/main.rs +++ b/crates/batcher/src/main.rs @@ -39,7 +39,12 @@ async fn main() -> Result<(), BatcherError> { None => dotenvy::dotenv().ok(), }; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + env_logger::Builder::from_env(Env::default().filter_or( + "RUST_LOG", + "info,aligned_batcher=debug,ethers_providers=off", + )) + .init(); + let batcher = Batcher::new(cli.config).await; let batcher = Arc::new(batcher);