From 08d51b64cba6e65975489367a15d4db392cb1ee3 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 30 Jun 2025 10:06:08 -0300 Subject: [PATCH 01/18] feat: move verification after the message and replamcents validations --- crates/batcher/src/lib.rs | 92 ++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index a3d0a670e3..fa4e5cb89b 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -613,6 +613,9 @@ impl Batcher { debug!("Received message with nonce: {msg_nonce:?}"); self.metrics.received_proofs.inc(); + // TODO: check if the user is already being attended + // TODO: check if a batch is being built + // * ---------------------------------------------------* // * Perform validations over the message * // * ---------------------------------------------------* @@ -662,46 +665,6 @@ impl Batcher { nonced_verification_data = aux_verification_data } - // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients - if self.pre_verification_is_enabled { - let verification_data = &nonced_verification_data.verification_data; - if self - .is_verifier_disabled(verification_data.proving_system) - .await - { - warn!( - "Verifier for proving system {} is disabled, skipping verification", - verification_data.proving_system - ); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( - verification_data.proving_system, - )), - ) - .await; - self.metrics.user_error(&[ - "disabled_verifier", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - - if !zk_utils::verify(verification_data).await { - error!("Invalid proof detected. Verification failed"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), - ) - .await; - self.metrics.user_error(&[ - "rejected_proof", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - } - info!("Handling message"); // We don't need a batch state lock here, since if the user locks its funds @@ -715,11 +678,10 @@ impl Batcher { // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - let is_user_in_state: bool; - { + let is_user_in_state: bool = { let batch_state_lock = self.batch_state.lock().await; - is_user_in_state = batch_state_lock.user_states.contains_key(&addr); - } + batch_state_lock.user_states.contains_key(&addr) + }; if !is_user_in_state { let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await { @@ -859,6 +821,8 @@ impl Batcher { return Ok(()); } + self.verify_proof(&nonced_verification_data); + // * ---------------------------------------------------------------------* // * Perform validation over batcher queue * // * ---------------------------------------------------------------------* @@ -1003,6 +967,9 @@ impl Batcher { return; } + // if all went well, verify the proof + self.verify_proof(&nonced_verification_data); + info!("Replacing message for address {addr} with nonce {nonce} and max fee {replacement_max_fee}"); // The replacement entry is built from the old entry and validated for then to be replaced @@ -2040,4 +2007,41 @@ impl Batcher { true } + + async fn verify_proof( + &self, + nonced_verification_data: &NoncedVerificationData, + ) -> Result<(), ProofInvalidReason> { + if !self.pre_verification_is_enabled { + return Ok(()); + } + let verification_data = &nonced_verification_data.verification_data; + if self + .is_verifier_disabled(verification_data.proving_system) + .await + { + warn!( + "Verifier for proving system {} is disabled, skipping verification", + verification_data.proving_system + ); + self.metrics.user_error(&[ + "disabled_verifier", + &format!("{}", verification_data.proving_system), + ]); + return Err(ProofInvalidReason::DisabledVerifier( + verification_data.proving_system, + )); + } + + if !zk_utils::verify(verification_data).await { + error!("Invalid proof detected. Verification failed"); + self.metrics.user_error(&[ + "rejected_proof", + &format!("{}", verification_data.proving_system), + ]); + return Err(ProofInvalidReason::RejectedProof); + } + + Ok(()) + } } From 766fa70033b038d9dc714b78529f3279f4922b3c Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 30 Jun 2025 17:06:18 -0300 Subject: [PATCH 02/18] feat: proof processing via channels --- crates/batcher/src/lib.rs | 1 + crates/batcher/src/proof_processor.rs | 99 +++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 crates/batcher/src/proof_processor.rs diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index fa4e5cb89b..18afc66b73 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -61,6 +61,7 @@ mod eth; mod ffi; pub mod gnark; pub mod metrics; +mod proof_processor; pub mod retry; pub mod risc_zero; pub mod s3; diff --git a/crates/batcher/src/proof_processor.rs b/crates/batcher/src/proof_processor.rs new file mode 100644 index 0000000000..b105c5addb --- /dev/null +++ b/crates/batcher/src/proof_processor.rs @@ -0,0 +1,99 @@ +use std::{collections::HashMap, sync::Arc}; + +use aligned_sdk::common::types::SubmitProofResponseMessage; +use ethers::abi::Address; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Mutex, +}; + +use crate::types::batch_state::BatchState; + +pub enum ProofProcessorMessage { + ValidateProof { + response_tx: oneshot::Sender, + }, + StopProcessing { + response_tx: oneshot::Sender, + }, + RestartProcessing, +} + +pub struct ProofProcessorForwarder { + tx: Sender, +} + +impl ProofProcessorForwarder { + /// Validates a proof for the given proof + pub fn validate_proof(&self) {} + /// Stops the processing of proofs and returns true when all the ongoing messages have been processed + pub fn stop_processing(&self) {} + /// Restarts the processing of proofs + pub fn restart_processing(&self) {} +} + +pub struct ProofProcessor { + user_mutexes: HashMap>, + batch_state: Arc>, + rx: Receiver, + processing_stopped: bool, + current_messages: Arc>, +} + +impl ProofProcessor { + pub fn new(batch_state: Arc>) -> (Self, Sender) { + let (tx, rx) = mpsc::channel::(100); + let processor = Self { + batch_state, + user_mutexes: HashMap::new(), + rx, + processing_stopped: false, + current_messages: Arc::new(Mutex::new(0)), + }; + (processor, tx) + } + + async fn start(&mut self) { + self.recv().await; + } + + async fn recv(&mut self) { + while let Some(message) = self.rx.recv().await { + match message { + ProofProcessorMessage::ValidateProof { response_tx } => { + // check if processing should stop because the batcher requested for it + if self.processing_stopped { + continue; + } + + *self.current_messages.lock().await += 1; + let current_messages_clone = self.current_messages.clone(); + let batch_state_clone = self.batch_state.clone(); + tokio::task::spawn(async move { + // TODO: lock user mutex check, if is being attended stop + let result = Self::handle_validate_proof_message(batch_state_clone).await; + let _ = response_tx.send(result); + *current_messages_clone.lock().await -= 1; + }); + } + ProofProcessorMessage::StopProcessing { response_tx } => { + self.processing_stopped = true; + // wait until it can lock all the mutex of the users + // we know that no new mutex would appear since we have set processing_stopped = true + for mutex in self.user_mutexes.values() { + let _ = mutex.lock().await; + } + + let _ = response_tx.send(true); + } + ProofProcessorMessage::RestartProcessing => self.processing_stopped = false, + } + } + } + + async fn handle_validate_proof_message( + batch_state: Arc>, + ) -> SubmitProofResponseMessage { + SubmitProofResponseMessage::InvalidMaxFee + } +} From f48157d76297f8a34dd0b4cde81e1296e5bfe9c7 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 30 Jun 2025 17:54:37 -0300 Subject: [PATCH 03/18] feat: lock per user implementation --- crates/batcher/src/lib.rs | 77 ++++++++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 18afc66b73..a059fff5d6 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -86,6 +86,7 @@ pub struct Batcher { service_manager: ServiceManager, service_manager_fallback: ServiceManager, batch_state: Mutex, + user_mutexes: Mutex>>>, min_block_interval: u64, transaction_wait_timeout: u64, max_proof_size: usize, @@ -276,6 +277,7 @@ impl Batcher { aggregator_gas_cost: config.batcher.aggregator_gas_cost, posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), + user_mutexes: Mutex::new(HashMap::new()), disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, @@ -679,6 +681,7 @@ impl Batcher { // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. + // Step 1: Get or insert the per-address mutex under lock let is_user_in_state: bool = { let batch_state_lock = self.batch_state.lock().await; batch_state_lock.user_states.contains_key(&addr) @@ -727,13 +730,22 @@ impl Batcher { // This is needed because we need to query the user state to make validations and // finally add the proof to the batch queue. - let mut batch_state_lock = self.batch_state.lock().await; + let user_mutex = { + let mut map = self.user_mutexes.lock().await; + map.entry(addr) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + let _ = user_mutex.lock().await; let msg_max_fee = nonced_verification_data.max_fee; - let Some(user_last_max_fee_limit) = - batch_state_lock.get_user_last_max_fee_limit(&addr).await + let Some(user_last_max_fee_limit) = self + .batch_state + .lock() + .await + .get_user_last_max_fee_limit(&addr) + .await else { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, @@ -743,9 +755,13 @@ impl Batcher { return Ok(()); }; - let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await + let Some(user_accumulated_fee) = self + .batch_state + .lock() + .await + .get_user_total_fees_in_queue(&addr) + .await else { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, @@ -756,7 +772,6 @@ impl Batcher { }; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InsufficientBalance(addr), @@ -766,11 +781,10 @@ impl Batcher { return Ok(()); } - let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await; + let cached_user_nonce = self.batch_state.lock().await.get_user_nonce(&addr).await; let Some(expected_nonce) = cached_user_nonce else { error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted"); - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, @@ -781,7 +795,6 @@ impl Batcher { }; if expected_nonce < msg_nonce { - std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}"); send_message( ws_conn_sink.clone(), @@ -797,7 +810,6 @@ impl Batcher { if expected_nonce > msg_nonce { info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}"); self.handle_replacement_message( - batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), client_msg.signature, @@ -811,7 +823,6 @@ impl Batcher { // We check this after replacement logic because if user wants to replace a proof, their // new_max_fee must be greater or equal than old_max_fee if msg_max_fee > user_last_max_fee_limit { - std::mem::drop(batch_state_lock); warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -822,13 +833,21 @@ impl Batcher { return Ok(()); } - self.verify_proof(&nonced_verification_data); + if let Err(e) = self.verify_proof(&nonced_verification_data).await { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(e), + ) + .await; + return Ok(()); + }; // * ---------------------------------------------------------------------* // * Perform validation over batcher queue * // * ---------------------------------------------------------------------* - if batch_state_lock.is_queue_full() { + if self.batch_state.lock().await.is_queue_full() { + let mut batch_state_lock = self.batch_state.lock().await; debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry."); // This cannot panic, if the batch queue is full it has at least one item @@ -889,6 +908,7 @@ impl Batcher { // * Add message data into the queue and update user state * // * ---------------------------------------------------------------------* + let mut batch_state_lock = self.batch_state.lock().await; if let Err(e) = self .add_to_batch( batch_state_lock, @@ -934,7 +954,6 @@ impl Batcher { /// Returns true if the message was replaced in the batch, false otherwise async fn handle_replacement_message( &self, - mut batch_state_lock: MutexGuard<'_, BatchState>, nonced_verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, signature: Signature, @@ -942,8 +961,13 @@ impl Batcher { ) { let replacement_max_fee = nonced_verification_data.max_fee; let nonce = nonced_verification_data.nonce; - let Some(entry) = batch_state_lock.get_entry(addr, nonce) else { - std::mem::drop(batch_state_lock); + let Some(entry) = self + .batch_state + .lock() + .await + .get_entry(addr, nonce) + .cloned() + else { warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found"); send_message( ws_conn_sink.clone(), @@ -956,7 +980,6 @@ impl Batcher { let original_max_fee = entry.nonced_verification_data.max_fee; if original_max_fee > replacement_max_fee { - std::mem::drop(batch_state_lock); warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -969,7 +992,14 @@ impl Batcher { } // if all went well, verify the proof - self.verify_proof(&nonced_verification_data); + if let Err(e) = self.verify_proof(&nonced_verification_data).await { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(e), + ) + .await; + return; + }; info!("Replacing message for address {addr} with nonce {nonce} and max fee {replacement_max_fee}"); @@ -998,8 +1028,12 @@ impl Batcher { } replacement_entry.messaging_sink = Some(ws_conn_sink.clone()); - if !batch_state_lock.replacement_entry_is_valid(&replacement_entry) { - std::mem::drop(batch_state_lock); + if !self + .batch_state + .lock() + .await + .replacement_entry_is_valid(&replacement_entry) + { warn!("Invalid replacement message"); send_message( ws_conn_sink.clone(), @@ -1020,6 +1054,7 @@ impl Batcher { // note that the entries are considered equal for the priority queue // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry + let mut batch_state_lock = self.batch_state.lock().await; batch_state_lock.batch_queue.remove(&replacement_entry); batch_state_lock.batch_queue.push( replacement_entry.clone(), From 44c36e69056cccfbf0b8a5f677c047eff7889545 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 30 Jun 2025 17:59:30 -0300 Subject: [PATCH 04/18] feat: batch building mutex --- crates/batcher/src/lib.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index a059fff5d6..0997ff001b 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -86,6 +86,7 @@ pub struct Batcher { service_manager: ServiceManager, service_manager_fallback: ServiceManager, batch_state: Mutex, + batch_building_mutex: Mutex<()>, user_mutexes: Mutex>>>, min_block_interval: u64, transaction_wait_timeout: u64, @@ -278,6 +279,7 @@ impl Batcher { posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), user_mutexes: Mutex::new(HashMap::new()), + batch_building_mutex: Mutex::new(()), disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, @@ -616,8 +618,9 @@ impl Batcher { debug!("Received message with nonce: {msg_nonce:?}"); self.metrics.received_proofs.inc(); - // TODO: check if the user is already being attended - // TODO: check if a batch is being built + // if this is locked, then it means that the a batch is being built + // so we need to stop the processing + self.batch_building_mutex.lock().await; // * ---------------------------------------------------* // * Perform validations over the message * @@ -908,7 +911,7 @@ impl Batcher { // * Add message data into the queue and update user state * // * ---------------------------------------------------------------------* - let mut batch_state_lock = self.batch_state.lock().await; + let batch_state_lock = self.batch_state.lock().await; if let Err(e) = self .add_to_batch( batch_state_lock, @@ -1228,7 +1231,12 @@ impl Batcher { block_number: u64, gas_price: U256, ) -> Option> { + let batch_building_mutex = self.batch_building_mutex.lock().await; let batch_state_lock = self.batch_state.lock().await; + // acquire all the user locks to make sure all the ongoing message have been processed + for user_mutex in self.user_mutexes.lock().await.values() { + let _ = user_mutex.lock().await; + } let current_batch_len = batch_state_lock.batch_queue.len(); let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await; From ef8c9a3937026163d0e8a4692c1f53c850ea1f84 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 11:55:39 -0300 Subject: [PATCH 05/18] refactor: move user update to batch state --- crates/batcher/src/lib.rs | 64 +++++++------------------ crates/batcher/src/types/batch_state.rs | 32 +++++++++++++ 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 0997ff001b..15530d168f 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -620,7 +620,7 @@ impl Batcher { // if this is locked, then it means that the a batch is being built // so we need to stop the processing - self.batch_building_mutex.lock().await; + let _ = self.batch_building_mutex.lock().await; // * ---------------------------------------------------* // * Perform validations over the message * @@ -910,11 +910,8 @@ impl Batcher { // * ---------------------------------------------------------------------* // * Add message data into the queue and update user state * // * ---------------------------------------------------------------------* - - let batch_state_lock = self.batch_state.lock().await; if let Err(e) = self .add_to_batch( - batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), signature, @@ -928,6 +925,21 @@ impl Batcher { return Ok(()); }; + if let Err(_) = self + .batch_state + .lock() + .await + .update_user_after_adding_proof(addr, msg_nonce, msg_max_fee) + .await + { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + return Ok(()); + } + info!("Verification data message handled"); Ok(()) } @@ -1137,7 +1149,6 @@ impl Batcher { /// Adds verification data to the current batch queue. async fn add_to_batch( &self, - mut batch_state_lock: MutexGuard<'_, BatchState>, verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, proof_submitter_sig: Signature, @@ -1149,6 +1160,7 @@ impl Batcher { let max_fee = verification_data.max_fee; let nonce = verification_data.nonce; + let mut batch_state_lock = self.batch_state.lock().await; batch_state_lock.batch_queue.push( BatchQueueEntry::new( verification_data, @@ -1168,46 +1180,6 @@ impl Batcher { info!("Current batch queue length: {}", queue_len); - let Some(user_proof_count) = batch_state_lock - .get_user_proof_count(&proof_submitter_addr) - .await - else { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; - - let Some(current_total_fees_in_queue) = batch_state_lock - .get_user_total_fees_in_queue(&proof_submitter_addr) - .await - else { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; - - // User state is updated - if batch_state_lock - .update_user_state( - &proof_submitter_addr, - nonce + U256::one(), - max_fee, - user_proof_count + 1, - current_total_fees_in_queue + max_fee, - ) - .is_none() - { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; - Ok(()) } @@ -1231,7 +1203,7 @@ impl Batcher { block_number: u64, gas_price: U256, ) -> Option> { - let batch_building_mutex = self.batch_building_mutex.lock().await; + let _ = self.batch_building_mutex.lock().await; let batch_state_lock = self.batch_state.lock().await; // acquire all the user locks to make sure all the ongoing message have been processed for user_mutex in self.user_mutexes.lock().await.values() { diff --git a/crates/batcher/src/types/batch_state.rs b/crates/batcher/src/types/batch_state.rs index 481ca44f74..9e60fcef18 100644 --- a/crates/batcher/src/types/batch_state.rs +++ b/crates/batcher/src/types/batch_state.rs @@ -176,6 +176,38 @@ impl BatchState { None } + pub(crate) async fn update_user_after_adding_proof( + &mut self, + addr: Address, + nonce: U256, + max_fee: U256, + ) -> Result<(), String> { + let Some(user_proof_count) = self.get_user_proof_count(&addr).await else { + return Err("user proof count".into()); + }; + + let Some(current_total_fees_in_queue) = self.get_user_total_fees_in_queue(&addr).await + else { + return Err("user total fees in queue not found".into()); + }; + + // User state is updated + if self + .update_user_state( + &addr, + nonce + U256::one(), + max_fee, + user_proof_count + 1, + current_total_fees_in_queue + max_fee, + ) + .is_none() + { + return Err("user not found".into()); + }; + + Ok(()) + } + // LOGIC: pub(crate) fn calculate_new_user_states_data(&self) -> HashMap { From d25bef116312fa2d536d47a8eeaf9e40948f5293 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 12:06:06 -0300 Subject: [PATCH 06/18] chore: address clippy warnings --- crates/batcher/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 15530d168f..43b72d82b6 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -46,7 +46,7 @@ use lambdaworks_crypto::merkle_tree::merkle::MerkleTree; use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend; use log::{debug, error, info, warn}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, MutexGuard, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tokio_tungstenite::tungstenite::{Error, Message}; use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority}; use types::errors::{BatcherError, TransactionSendError}; @@ -61,7 +61,6 @@ mod eth; mod ffi; pub mod gnark; pub mod metrics; -mod proof_processor; pub mod retry; pub mod risc_zero; pub mod s3; @@ -925,12 +924,13 @@ impl Batcher { return Ok(()); }; - if let Err(_) = self + if self .batch_state .lock() .await .update_user_after_adding_proof(addr, msg_nonce, msg_max_fee) .await + .is_err() { send_message( ws_conn_sink.clone(), From 653283c2d871c8641f8fcccde9b54cd9b4c8b8ec Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 14:13:37 -0300 Subject: [PATCH 07/18] fix: handle block don't spawn a new task for it instead wait until it finishes and update the last block number with the receipt --- crates/batcher/src/lib.rs | 145 +++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 74 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 43b72d82b6..57e8b31291 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -415,11 +415,9 @@ impl Batcher { } info!("Received new block: {}", block_number); - tokio::spawn(async move { - if let Err(e) = batcher.handle_new_block(block_number).await { - error!("Error when handling new block: {:?}", e); - } - }); + if let Err(e) = batcher.handle_new_block(block_number).await { + error!("Error when handling new block: {:?}", e); + } } error!("Both main and fallback Ethereum WS clients subscriptions have disconnected, will try to reconnect..."); @@ -619,7 +617,9 @@ impl Batcher { // if this is locked, then it means that the a batch is being built // so we need to stop the processing + debug!("Checking if a batch is being built before proceeding with the message"); let _ = self.batch_building_mutex.lock().await; + debug!("Batch building has finished or did't started, proceeding with the message"); // * ---------------------------------------------------* // * Perform validations over the message * @@ -732,45 +732,36 @@ impl Batcher { // This is needed because we need to query the user state to make validations and // finally add the proof to the batch queue. + debug!("Trying to acquire user mutex for {:?}...", addr_in_msg); let user_mutex = { let mut map = self.user_mutexes.lock().await; map.entry(addr) .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() }; - let _ = user_mutex.lock().await; + let _user_mutex = user_mutex.lock().await; + debug!("User mutex for {:?} acquired...", addr_in_msg); let msg_max_fee = nonced_verification_data.max_fee; - let Some(user_last_max_fee_limit) = self - .batch_state - .lock() - .await - .get_user_last_max_fee_limit(&addr) - .await - else { - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; + let (user_last_max_fee_limit, user_accumulated_fee) = { + let batch_state_lock = self.batch_state.lock().await; + let last_max_fee = batch_state_lock.get_user_last_max_fee_limit(&addr).await; + let accumulated_fee = batch_state_lock.get_user_total_fees_in_queue(&addr).await; + drop(batch_state_lock); - let Some(user_accumulated_fee) = self - .batch_state - .lock() - .await - .get_user_total_fees_in_queue(&addr) - .await - else { - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); + match (last_max_fee, accumulated_fee) { + (Some(last_max), Some(accumulated)) => (last_max, accumulated), + _ => { + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + } + } }; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { @@ -1203,15 +1194,18 @@ impl Batcher { block_number: u64, gas_price: U256, ) -> Option> { + info!("Batch building: started, acquiring lock to stop processing new messages..."); let _ = self.batch_building_mutex.lock().await; - let batch_state_lock = self.batch_state.lock().await; + + info!("Batch building: waiting until all the ongoing messages finish"); // acquire all the user locks to make sure all the ongoing message have been processed for user_mutex in self.user_mutexes.lock().await.values() { let _ = user_mutex.lock().await; } - let current_batch_len = batch_state_lock.batch_queue.len(); - let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await; + info!("Batch building: all locks acquired, proceeding to build batch"); + let batch_state_lock = self.batch_state.lock().await; + let current_batch_len = batch_state_lock.batch_queue.len(); if current_batch_len < 1 { info!( "Current batch has {} proofs. Waiting for more proofs...", @@ -1220,6 +1214,7 @@ impl Batcher { return None; } + let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await; if block_number < *last_uploaded_batch_block_lock + self.min_block_interval { info!( "Current batch not ready to be posted. Minimium amount of {} blocks have not passed. Block passed: {}", self.min_block_interval, @@ -1332,7 +1327,6 @@ impl Batcher { /// The last uploaded batch block is updated once the task is created in Aligned. async fn finalize_batch( &self, - block_number: u64, finalized_batch: Vec, gas_price: U256, ) -> Result<(), BatcherError> { @@ -1364,16 +1358,6 @@ impl Batcher { ) })?; - { - let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await; - // update last uploaded batch block - *last_uploaded_batch_block = block_number; - info!( - "Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked", - block_number - ); - } - let leaves: Vec<[u8; 32]> = batch_data_comm .iter() .map(VerificationCommitmentBatch::hash_data) @@ -1388,7 +1372,7 @@ impl Batcher { } // Here we submit the batch on-chain - if let Err(e) = self + match self .submit_batch( &batch_bytes, &batch_merkle_tree.root, @@ -1398,30 +1382,41 @@ impl Batcher { ) .await { - let reason = format!("{:?}", e); - if let Err(e) = self - .telemetry - .task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason) - .await - { - warn!("Failed to send task status to telemetry: {:?}", e); + Ok(block_number) => { + let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await; + // update last uploaded batch block + *last_uploaded_batch_block = block_number; + info!( + "Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked", + block_number + ); } - - // decide if i want to flush the queue: - match e { - BatcherError::TransactionSendError( - TransactionSendError::SubmissionInsufficientBalance, - ) => { - // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch - // this would also need a message sent to the clients - self.flush_queue_and_clear_nonce_cache().await; + Err(e) => { + let reason = format!("{:?}", e); + if let Err(e) = self + .telemetry + .task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason) + .await + { + warn!("Failed to send task status to telemetry: {:?}", e); } - _ => { - // Add more cases here if we want in the future + + // decide if i want to flush the queue: + match e { + BatcherError::TransactionSendError( + TransactionSendError::SubmissionInsufficientBalance, + ) => { + // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch + // this would also need a message sent to the clients + self.flush_queue_and_clear_nonce_cache().await; + } + _ => { + // Add more cases here if we want in the future + } } - } - return Err(e); + return Err(e); + } }; // Once the submit is succesfull, we remove the submitted proofs from the queue @@ -1499,7 +1494,7 @@ impl Batcher { if let Some(finalized_batch) = self.is_batch_ready(block_number, modified_gas_price).await { let batch_finalization_result = self - .finalize_batch(block_number, finalized_batch, modified_gas_price) + .finalize_batch(finalized_batch, modified_gas_price) .await; // Resetting this here to avoid doing it on every return path of `finalize_batch` function @@ -1520,7 +1515,7 @@ impl Batcher { leaves: Vec<[u8; 32]>, finalized_batch: &[BatchQueueEntry], gas_price: U256, - ) -> Result<(), BatcherError> { + ) -> Result { let batch_merkle_root_hex = hex::encode(batch_merkle_root); info!("Batch merkle root: 0x{}", batch_merkle_root_hex); let file_name = batch_merkle_root_hex.clone() + ".json"; @@ -1592,10 +1587,10 @@ impl Batcher { ) .await { - Ok(_) => { + Ok(res) => { info!("Batch verification task created on Aligned contract"); self.metrics.sent_batches.inc(); - Ok(()) + Ok(res.block_number.map(|e| e.as_u64()).unwrap_or_default()) } Err(e) => { error!("Failed to send batch to contract: {:?}", e); @@ -2028,6 +2023,8 @@ impl Batcher { &self, nonced_verification_data: &NoncedVerificationData, ) -> Result<(), ProofInvalidReason> { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + if !self.pre_verification_is_enabled { return Ok(()); } From 067b64ac45145bda8bdbe2f50c537b66808925d7 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 14:15:49 -0300 Subject: [PATCH 08/18] chore: better tracing for debug mode --- crates/batcher/.env.dev | 3 ++- crates/batcher/src/main.rs | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/batcher/.env.dev b/crates/batcher/.env.dev index 40ede80935..e0d1615f4e 100644 --- a/crates/batcher/.env.dev +++ b/crates/batcher/.env.dev @@ -4,5 +4,6 @@ AWS_ACCESS_KEY_ID=test AWS_BUCKET_NAME=aligned.storage UPLOAD_ENDPOINT=http://localhost:4566 DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage -RUST_LOG=info +# Override default logs filter +# RUST_LOG=info RUST_BACKTRACE=1 diff --git a/crates/batcher/src/main.rs b/crates/batcher/src/main.rs index 4cdcad3566..4594292b89 100644 --- a/crates/batcher/src/main.rs +++ b/crates/batcher/src/main.rs @@ -39,7 +39,12 @@ async fn main() -> Result<(), BatcherError> { None => dotenvy::dotenv().ok(), }; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + env_logger::Builder::from_env(Env::default().filter_or( + "RUST_LOG", + "info,aligned_batcher=debug,ethers_providers=off", + )) + .init(); + let batcher = Batcher::new(cli.config).await; let batcher = Arc::new(batcher); From 4aabc5332d44823393b32e3b35cfc460caffd7f5 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 14:24:44 -0300 Subject: [PATCH 09/18] fix: remove sleep from verify_proof --- crates/batcher/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 57e8b31291..3814f3dc70 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -1195,7 +1195,7 @@ impl Batcher { gas_price: U256, ) -> Option> { info!("Batch building: started, acquiring lock to stop processing new messages..."); - let _ = self.batch_building_mutex.lock().await; + let _batch_building_mutex = self.batch_building_mutex.lock().await; info!("Batch building: waiting until all the ongoing messages finish"); // acquire all the user locks to make sure all the ongoing message have been processed @@ -2023,8 +2023,6 @@ impl Batcher { &self, nonced_verification_data: &NoncedVerificationData, ) -> Result<(), ProofInvalidReason> { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - if !self.pre_verification_is_enabled { return Ok(()); } From ae5583428a90adcda577b1f7a00c4e578edc2935 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 15:08:12 -0300 Subject: [PATCH 10/18] chore: remove proof processor --- crates/batcher/src/proof_processor.rs | 99 --------------------------- 1 file changed, 99 deletions(-) delete mode 100644 crates/batcher/src/proof_processor.rs diff --git a/crates/batcher/src/proof_processor.rs b/crates/batcher/src/proof_processor.rs deleted file mode 100644 index b105c5addb..0000000000 --- a/crates/batcher/src/proof_processor.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use aligned_sdk::common::types::SubmitProofResponseMessage; -use ethers::abi::Address; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, Mutex, -}; - -use crate::types::batch_state::BatchState; - -pub enum ProofProcessorMessage { - ValidateProof { - response_tx: oneshot::Sender, - }, - StopProcessing { - response_tx: oneshot::Sender, - }, - RestartProcessing, -} - -pub struct ProofProcessorForwarder { - tx: Sender, -} - -impl ProofProcessorForwarder { - /// Validates a proof for the given proof - pub fn validate_proof(&self) {} - /// Stops the processing of proofs and returns true when all the ongoing messages have been processed - pub fn stop_processing(&self) {} - /// Restarts the processing of proofs - pub fn restart_processing(&self) {} -} - -pub struct ProofProcessor { - user_mutexes: HashMap>, - batch_state: Arc>, - rx: Receiver, - processing_stopped: bool, - current_messages: Arc>, -} - -impl ProofProcessor { - pub fn new(batch_state: Arc>) -> (Self, Sender) { - let (tx, rx) = mpsc::channel::(100); - let processor = Self { - batch_state, - user_mutexes: HashMap::new(), - rx, - processing_stopped: false, - current_messages: Arc::new(Mutex::new(0)), - }; - (processor, tx) - } - - async fn start(&mut self) { - self.recv().await; - } - - async fn recv(&mut self) { - while let Some(message) = self.rx.recv().await { - match message { - ProofProcessorMessage::ValidateProof { response_tx } => { - // check if processing should stop because the batcher requested for it - if self.processing_stopped { - continue; - } - - *self.current_messages.lock().await += 1; - let current_messages_clone = self.current_messages.clone(); - let batch_state_clone = self.batch_state.clone(); - tokio::task::spawn(async move { - // TODO: lock user mutex check, if is being attended stop - let result = Self::handle_validate_proof_message(batch_state_clone).await; - let _ = response_tx.send(result); - *current_messages_clone.lock().await -= 1; - }); - } - ProofProcessorMessage::StopProcessing { response_tx } => { - self.processing_stopped = true; - // wait until it can lock all the mutex of the users - // we know that no new mutex would appear since we have set processing_stopped = true - for mutex in self.user_mutexes.values() { - let _ = mutex.lock().await; - } - - let _ = response_tx.send(true); - } - ProofProcessorMessage::RestartProcessing => self.processing_stopped = false, - } - } - } - - async fn handle_validate_proof_message( - batch_state: Arc>, - ) -> SubmitProofResponseMessage { - SubmitProofResponseMessage::InvalidMaxFee - } -} From 6cdf54e516c36b161cdee0163c4ed7b1a2c438dc Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 16:37:12 -0300 Subject: [PATCH 11/18] feat: priority for batch building process when acquiring user locks --- crates/batcher/src/lib.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 3814f3dc70..23d2248bce 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -683,7 +683,6 @@ impl Batcher { // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - // Step 1: Get or insert the per-address mutex under lock let is_user_in_state: bool = { let batch_state_lock = self.batch_state.lock().await; batch_state_lock.user_states.contains_key(&addr) @@ -739,7 +738,28 @@ impl Batcher { .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() }; - let _user_mutex = user_mutex.lock().await; + + // This looks very ugly but basically, we are doing the following: + // 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it + // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `batch_building_mutex` + // 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish) + // 4. If it isn't building then continue with the message + // + // This is done to give the batcher builder process priority + // and prevent a situation where we are the batcher wants to build a new batch + // but it has to wait for a ton of messages to be processed first + // Leading to a decrease batch throughput + let _user_mutex = loop { + let _user_mutex = user_mutex.lock().await; + let res = self.batch_building_mutex.try_lock(); + if res.is_ok() { + break _user_mutex; + } else { + drop(_user_mutex); + // tell the runtime to we are done for now and continue with another task + tokio::task::yield_now().await; + } + }; debug!("User mutex for {:?} acquired...", addr_in_msg); let msg_max_fee = nonced_verification_data.max_fee; @@ -815,6 +835,11 @@ impl Batcher { // We check this after replacement logic because if user wants to replace a proof, their // new_max_fee must be greater or equal than old_max_fee + // + // Note: we don't do this before the handle_replacement_message as this operation can block for some time + // this is run again in the handle_replacement_message + // by enforcing stricter rules in replacements (a min bump + min fee) we can be sure this is run on valid message that user will actually pay for it + // and so running the pre-verification isn't free if msg_max_fee > user_last_max_fee_limit { warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( @@ -826,6 +851,7 @@ impl Batcher { return Ok(()); } + // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients if let Err(e) = self.verify_proof(&nonced_verification_data).await { send_message( ws_conn_sink.clone(), From 26b164170bb0e07df6f4eebb65b9f333f4d86880 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 18:12:44 -0300 Subject: [PATCH 12/18] refactor: naming, docs, logs, style --- crates/batcher/src/lib.rs | 177 ++++++++++++++++-------- crates/batcher/src/types/batch_state.rs | 32 ----- 2 files changed, 116 insertions(+), 93 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 23d2248bce..f9383c34cb 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -84,9 +84,23 @@ pub struct Batcher { payment_service_fallback: BatcherPaymentService, service_manager: ServiceManager, service_manager_fallback: ServiceManager, + /// Holds both the user state and the proofs queue. + /// + /// We should consider splitting the user state and the queue into separate mutexes + /// to improve concurrency. batch_state: Mutex, - batch_building_mutex: Mutex<()>, - user_mutexes: Mutex>>>, + /// A mutex that signals an ongoing batch building process. + /// It remains locked until the batch has been fully built. + /// Used to synchronize the processing of proofs during batch construction. + building_batch_mutex: Mutex<()>, + /// A map of per-user mutexes used to synchronize proof processing. + /// It allows us to mutate the users state atomically, + /// while avoiding the need to lock the entire [`batch_state`] structure. + /// + /// During batch building, the process also locks these per-user mutexes + /// (after acquiring [`building_batch_mutex`]) to ensure that all ongoing + /// proof messages complete and the state remains consistent. + user_proof_processing_mutexes: Mutex>>>, min_block_interval: u64, transaction_wait_timeout: u64, max_proof_size: usize, @@ -95,7 +109,6 @@ pub struct Batcher { last_uploaded_batch_block: Mutex, pre_verification_is_enabled: bool, non_paying_config: Option, - posting_batch: Mutex, disabled_verifiers: Mutex, aggregator_fee_percentage_multiplier: u128, aggregator_gas_cost: u128, @@ -275,10 +288,9 @@ impl Batcher { .batcher .aggregator_fee_percentage_multiplier, aggregator_gas_cost: config.batcher.aggregator_gas_cost, - posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), - user_mutexes: Mutex::new(HashMap::new()), - batch_building_mutex: Mutex::new(()), + user_proof_processing_mutexes: Mutex::new(HashMap::new()), + building_batch_mutex: Mutex::new(()), disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, @@ -615,11 +627,10 @@ impl Batcher { debug!("Received message with nonce: {msg_nonce:?}"); self.metrics.received_proofs.inc(); - // if this is locked, then it means that the a batch is being built - // so we need to stop the processing - debug!("Checking if a batch is being built before proceeding with the message"); - let _ = self.batch_building_mutex.lock().await; - debug!("Batch building has finished or did't started, proceeding with the message"); + // Make sure there are no batches being built before processing the message + debug!("Checking if there is an ongoing batch before processing the message..."); + let _ = self.building_batch_mutex.lock().await; + debug!("Batch building mutex acquired. Proceeding with message processing."); // * ---------------------------------------------------* // * Perform validations over the message * @@ -727,13 +738,13 @@ impl Batcher { return Ok(()); }; - // For now on until the message is fully processed, the batch state is locked + // For now on until the message is fully processed, the corresponding user mutex is acquired from `user_proof_processing_mutexes` // This is needed because we need to query the user state to make validations and - // finally add the proof to the batch queue. - + // finally add the proof to the batch queue which must be done individually. + // This allows us to process each user without having to lock the whole `batch_state` debug!("Trying to acquire user mutex for {:?}...", addr_in_msg); let user_mutex = { - let mut map = self.user_mutexes.lock().await; + let mut map = self.user_proof_processing_mutexes.lock().await; map.entry(addr) .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() @@ -741,7 +752,7 @@ impl Batcher { // This looks very ugly but basically, we are doing the following: // 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it - // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `batch_building_mutex` + // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `building_batch_mutex` // 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish) // 4. If it isn't building then continue with the message // @@ -751,7 +762,7 @@ impl Batcher { // Leading to a decrease batch throughput let _user_mutex = loop { let _user_mutex = user_mutex.lock().await; - let res = self.batch_building_mutex.try_lock(); + let res = self.building_batch_mutex.try_lock(); if res.is_ok() { break _user_mutex; } else { @@ -833,13 +844,6 @@ impl Batcher { return Ok(()); } - // We check this after replacement logic because if user wants to replace a proof, their - // new_max_fee must be greater or equal than old_max_fee - // - // Note: we don't do this before the handle_replacement_message as this operation can block for some time - // this is run again in the handle_replacement_message - // by enforcing stricter rules in replacements (a min bump + min fee) we can be sure this is run on valid message that user will actually pay for it - // and so running the pre-verification isn't free if msg_max_fee > user_last_max_fee_limit { warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( @@ -851,8 +855,20 @@ impl Batcher { return Ok(()); } - // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients - if let Err(e) = self.verify_proof(&nonced_verification_data).await { + // * ---------------------------------------------------* + // * Validate proof * + // * ---------------------------------------------------* + + // Note: While it may seem obvious to run this before `handle_replacement_message` and avoid repeating code + // We intentionally do not run this verification before `handle_replacement_message` + // because this function is "expensive" and it may block for a few milliseconds. + // + // When handling replacement message, before running the verification we make sure + // the user has bumped the fee enough so that running the pre-verification is justified. + if let Err(e) = self + .verify_proof(&nonced_verification_data.verification_data) + .await + { send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InvalidProof(e), @@ -901,6 +917,7 @@ impl Batcher { batch_state_lock.update_user_state_on_entry_removal(&removed_entry); if let Some(removed_entry_ws) = removed_entry.messaging_sink { + std::mem::drop(batch_state_lock); send_message( removed_entry_ws, SubmitProofResponseMessage::UnderpricedProof, @@ -941,21 +958,67 @@ impl Batcher { return Ok(()); }; - if self + let Some(user_proof_count) = self + .batch_state + .lock() + .await + .get_user_proof_count(&addr) + .await + else { + error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + return Ok(()); + }; + + let Some(current_total_fees_in_queue) = self .batch_state .lock() .await - .update_user_after_adding_proof(addr, msg_nonce, msg_max_fee) + .get_user_total_fees_in_queue(&addr) + .await + else { + error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + return Ok(()); + }; + + // User state is updated + if self + .batch_state + .lock() .await - .is_err() + .update_user_state( + &addr, + msg_nonce + U256::one(), + msg_max_fee, + user_proof_count + 1, + current_total_fees_in_queue + msg_max_fee, + ) + .is_none() { + error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, ) .await; return Ok(()); - } + }; + + // Finally, we remove the mutex from the map + // + // Note: this removal is safe even if other processes are waiting on the lock + // This is because it is wrapped on an Arc so the variable will still live until all clones are dropped. + let mut user_mutexes = self.user_proof_processing_mutexes.lock().await; + user_mutexes.remove(&addr); info!("Verification data message handled"); Ok(()) @@ -1023,8 +1086,11 @@ impl Batcher { return; } - // if all went well, verify the proof - if let Err(e) = self.verify_proof(&nonced_verification_data).await { + // If all went well, verify the proof + if let Err(e) = self + .verify_proof(&nonced_verification_data.verification_data) + .await + { send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InvalidProof(e), @@ -1201,11 +1267,16 @@ impl Batcher { } /// Given a new block number listened from the blockchain, checks if the current batch is ready to be posted. + /// /// There are essentially two conditions to be checked: /// * Has the current batch reached the minimum size to be posted? /// * Has the received block number surpassed the maximum interval with respect to the last posted batch block? /// - /// Then the batch will be made as big as possible given this two conditions: + /// If both are met then: + /// * We acquire the building batch mutex to stop processing new proof messages + /// * We acquire all the users locks to wait until all current proof messages are processed + /// + /// Once we hold them, the biggest possible batch will be built, making sure that: /// * The serialized batch size needs to be smaller than the maximum batch size /// * The batch submission fee is less than the lowest `max fee` included the batch, /// * And the batch submission fee is more than the highest `max fee` not included the batch. @@ -1220,18 +1291,7 @@ impl Batcher { block_number: u64, gas_price: U256, ) -> Option> { - info!("Batch building: started, acquiring lock to stop processing new messages..."); - let _batch_building_mutex = self.batch_building_mutex.lock().await; - - info!("Batch building: waiting until all the ongoing messages finish"); - // acquire all the user locks to make sure all the ongoing message have been processed - for user_mutex in self.user_mutexes.lock().await.values() { - let _ = user_mutex.lock().await; - } - info!("Batch building: all locks acquired, proceeding to build batch"); - - let batch_state_lock = self.batch_state.lock().await; - let current_batch_len = batch_state_lock.batch_queue.len(); + let current_batch_len = self.batch_state.lock().await.batch_queue.len(); if current_batch_len < 1 { info!( "Current batch has {} proofs. Waiting for more proofs...", @@ -1249,17 +1309,17 @@ impl Batcher { return None; } - // Check if a batch is currently being posted - let mut batch_posting = self.posting_batch.lock().await; - if *batch_posting { - info!( - "Batch is currently being posted. Waiting for the current batch to be finalized..." - ); - return None; + info!("Batch building: started, acquiring lock to stop processing new messages..."); + let _building_batch_mutex = self.building_batch_mutex.lock().await; + + info!("Batch building: waiting until all the ongoing messages finish"); + // acquire all the user locks to make sure all the ongoing message have been processed + for user_mutex in self.user_proof_processing_mutexes.lock().await.values() { + let _ = user_mutex.lock().await; } + info!("Batch building: all user locks acquired, proceeding to build batch"); + let batch_state_lock = self.batch_state.lock().await; - // Set the batch posting flag to true - *batch_posting = true; let batch_queue_copy = batch_state_lock.batch_queue.clone(); let finalized_batch = batch_queue::try_build_batch( batch_queue_copy, @@ -1269,7 +1329,6 @@ impl Batcher { self.constant_gas_cost(), ) .inspect_err(|e| { - *batch_posting = false; match e { // We can't post a batch since users are not willing to pay the needed fee, wait for more proofs BatcherError::BatchCostTooHigh => { @@ -1523,10 +1582,6 @@ impl Batcher { .finalize_batch(finalized_batch, modified_gas_price) .await; - // Resetting this here to avoid doing it on every return path of `finalize_batch` function - let mut batch_posting = self.posting_batch.lock().await; - *batch_posting = false; - batch_finalization_result?; } @@ -2045,14 +2100,14 @@ impl Batcher { true } + /// Takes [`VerificationData`] and spawns a blocking task to verify the proof async fn verify_proof( &self, - nonced_verification_data: &NoncedVerificationData, + verification_data: &VerificationData, ) -> Result<(), ProofInvalidReason> { if !self.pre_verification_is_enabled { return Ok(()); } - let verification_data = &nonced_verification_data.verification_data; if self .is_verifier_disabled(verification_data.proving_system) .await diff --git a/crates/batcher/src/types/batch_state.rs b/crates/batcher/src/types/batch_state.rs index 9e60fcef18..481ca44f74 100644 --- a/crates/batcher/src/types/batch_state.rs +++ b/crates/batcher/src/types/batch_state.rs @@ -176,38 +176,6 @@ impl BatchState { None } - pub(crate) async fn update_user_after_adding_proof( - &mut self, - addr: Address, - nonce: U256, - max_fee: U256, - ) -> Result<(), String> { - let Some(user_proof_count) = self.get_user_proof_count(&addr).await else { - return Err("user proof count".into()); - }; - - let Some(current_total_fees_in_queue) = self.get_user_total_fees_in_queue(&addr).await - else { - return Err("user total fees in queue not found".into()); - }; - - // User state is updated - if self - .update_user_state( - &addr, - nonce + U256::one(), - max_fee, - user_proof_count + 1, - current_total_fees_in_queue + max_fee, - ) - .is_none() - { - return Err("user not found".into()); - }; - - Ok(()) - } - // LOGIC: pub(crate) fn calculate_new_user_states_data(&self) -> HashMap { From 7a7df60a3bcf8178ced079f0e4d7e33d7bb8d41f Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 1 Jul 2025 18:39:47 -0300 Subject: [PATCH 13/18] fix: clone user_proof_processing_mutexes to prevent deadlock --- crates/batcher/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index f9383c34cb..6e71fa82aa 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -1313,8 +1313,13 @@ impl Batcher { let _building_batch_mutex = self.building_batch_mutex.lock().await; info!("Batch building: waiting until all the ongoing messages finish"); + // acquire all the user locks to make sure all the ongoing message have been processed - for user_mutex in self.user_proof_processing_mutexes.lock().await.values() { + let mutexes: Vec>> = { + let user_proofs_lock = self.user_proof_processing_mutexes.lock().await; + user_proofs_lock.values().cloned().collect() + }; + for user_mutex in mutexes { let _ = user_mutex.lock().await; } info!("Batch building: all user locks acquired, proceeding to build batch"); From 30b7c44ff79d83d5b517efe931b55e3cff7a92d7 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 2 Jul 2025 12:59:34 -0300 Subject: [PATCH 14/18] fix: task-sender generate proofs target --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index ee26968cfc..449e1c4528 100644 --- a/Makefile +++ b/Makefile @@ -688,7 +688,7 @@ BURST_TIME_SECS ?= 3 task_sender_generate_gnark_groth16_proofs: @cd crates/task-sender && \ cargo run --release -- generate-proofs \ - --number-of-proofs $(NUMBER_OF_PROOFS) --proof-type gnark_groth16 \ + --number-of-proofs $(NUMBER_OF_PROOFS) --proof-type groth16 \ --dir-to-save-proofs $(CURDIR)/scripts/test_files/task_sender/proofs # ===== DEVNET ===== From a46bf2c06e9b67fb65757e35b7658e34e75d9928 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 2 Jul 2025 16:23:06 -0300 Subject: [PATCH 15/18] refactor: improvements over locks and code style --- crates/batcher/src/lib.rs | 147 ++++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 70 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 6e71fa82aa..e7c452b201 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -90,7 +90,12 @@ pub struct Batcher { /// to improve concurrency. batch_state: Mutex, /// A mutex that signals an ongoing batch building process. - /// It remains locked until the batch has been fully built. + /// It remains locked until the batch has been fully built and is ready to be submitted. + /// + /// When a new proof message arrives, before processing it + /// we check that this mutex isn't locked and if it is we wait until unlocked + /// + /// This check covers the case where a new user submits a message while a batch is in construction /// Used to synchronize the processing of proofs during batch construction. building_batch_mutex: Mutex<()>, /// A map of per-user mutexes used to synchronize proof processing. @@ -742,7 +747,7 @@ impl Batcher { // This is needed because we need to query the user state to make validations and // finally add the proof to the batch queue which must be done individually. // This allows us to process each user without having to lock the whole `batch_state` - debug!("Trying to acquire user mutex for {:?}...", addr_in_msg); + debug!("Trying to acquire user mutex for {:?}...", addr); let user_mutex = { let mut map = self.user_proof_processing_mutexes.lock().await; map.entry(addr) @@ -757,9 +762,9 @@ impl Batcher { // 4. If it isn't building then continue with the message // // This is done to give the batcher builder process priority - // and prevent a situation where we are the batcher wants to build a new batch + // and prevent a situation where the batcher wants to build a new batch // but it has to wait for a ton of messages to be processed first - // Leading to a decrease batch throughput + // Leading to a decrease in batch throughput let _user_mutex = loop { let _user_mutex = user_mutex.lock().await; let res = self.building_batch_mutex.try_lock(); @@ -778,9 +783,11 @@ impl Batcher { let batch_state_lock = self.batch_state.lock().await; let last_max_fee = batch_state_lock.get_user_last_max_fee_limit(&addr).await; let accumulated_fee = batch_state_lock.get_user_total_fees_in_queue(&addr).await; - drop(batch_state_lock); + (last_max_fee, accumulated_fee) + }; - match (last_max_fee, accumulated_fee) { + let (user_last_max_fee_limit, user_accumulated_fee) = + match (user_last_max_fee_limit, user_accumulated_fee) { (Some(last_max), Some(accumulated)) => (last_max, accumulated), _ => { send_message( @@ -792,8 +799,7 @@ impl Batcher { self.metrics.user_error(&["batcher_state_error", ""]); return Ok(()); } - } - }; + }; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { send_message( @@ -844,6 +850,8 @@ impl Batcher { return Ok(()); } + // We check this after replacement logic because if user wants to replace a proof, their + // new_max_fee must be greater or equal than old_max_fee if msg_max_fee > user_last_max_fee_limit { warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( @@ -958,61 +966,6 @@ impl Batcher { return Ok(()); }; - let Some(user_proof_count) = self - .batch_state - .lock() - .await - .get_user_proof_count(&addr) - .await - else { - error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - return Ok(()); - }; - - let Some(current_total_fees_in_queue) = self - .batch_state - .lock() - .await - .get_user_total_fees_in_queue(&addr) - .await - else { - error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - return Ok(()); - }; - - // User state is updated - if self - .batch_state - .lock() - .await - .update_user_state( - &addr, - msg_nonce + U256::one(), - msg_max_fee, - user_proof_count + 1, - current_total_fees_in_queue + msg_max_fee, - ) - .is_none() - { - error!("User state of address {addr} was not found when trying to update user state. This user state should have been present"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - return Ok(()); - }; - // Finally, we remove the mutex from the map // // Note: this removal is safe even if other processes are waiting on the lock @@ -1153,7 +1106,21 @@ impl Batcher { // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry let mut batch_state_lock = self.batch_state.lock().await; - batch_state_lock.batch_queue.remove(&replacement_entry); + if batch_state_lock + .batch_queue + .remove(&replacement_entry) + .is_none() + { + std::mem::drop(batch_state_lock); + warn!("Replacement entry for {addr:?} was not present in batcher queue"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + return; + }; + batch_state_lock.batch_queue.push( replacement_entry.clone(), BatchQueueEntryPriority::new(replacement_max_fee, nonce), @@ -1229,7 +1196,7 @@ impl Batcher { .await } - /// Adds verification data to the current batch queue. + /// Adds verification data to the current batch queue and updates the user state async fn add_to_batch( &self, verification_data: NoncedVerificationData, @@ -1263,6 +1230,46 @@ impl Batcher { info!("Current batch queue length: {}", queue_len); + let Some(user_proof_count) = batch_state_lock + .get_user_proof_count(&proof_submitter_addr) + .await + else { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + + let Some(current_total_fees_in_queue) = batch_state_lock + .get_user_total_fees_in_queue(&proof_submitter_addr) + .await + else { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + + // User state is updated + if batch_state_lock + .update_user_state( + &proof_submitter_addr, + nonce + U256::one(), + max_fee, + user_proof_count + 1, + current_total_fees_in_queue + max_fee, + ) + .is_none() + { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + Ok(()) } @@ -1312,9 +1319,7 @@ impl Batcher { info!("Batch building: started, acquiring lock to stop processing new messages..."); let _building_batch_mutex = self.building_batch_mutex.lock().await; - info!("Batch building: waiting until all the ongoing messages finish"); - - // acquire all the user locks to make sure all the ongoing message have been processed + info!("Batch building: waiting until all the user messages and proofs get processed"); let mutexes: Vec>> = { let user_proofs_lock = self.user_proof_processing_mutexes.lock().await; user_proofs_lock.values().cloned().collect() @@ -1323,9 +1328,11 @@ impl Batcher { let _ = user_mutex.lock().await; } info!("Batch building: all user locks acquired, proceeding to build batch"); - let batch_state_lock = self.batch_state.lock().await; - let batch_queue_copy = batch_state_lock.batch_queue.clone(); + let batch_queue_copy = { + let batch_state_lock = self.batch_state.lock().await; + batch_state_lock.batch_queue.clone() + }; let finalized_batch = batch_queue::try_build_batch( batch_queue_copy, gas_price, From 8837ea4253b2b5ffbe06b01c7acf2174e6f08ebc Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 3 Jul 2025 11:24:42 -0300 Subject: [PATCH 16/18] refactor: remove building_batch_mutex and sync between proof processing and batch building --- crates/batcher/src/lib.rs | 52 ++------------------------------------- 1 file changed, 2 insertions(+), 50 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index e7c452b201..dc2866c5bf 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -89,15 +89,7 @@ pub struct Batcher { /// We should consider splitting the user state and the queue into separate mutexes /// to improve concurrency. batch_state: Mutex, - /// A mutex that signals an ongoing batch building process. - /// It remains locked until the batch has been fully built and is ready to be submitted. - /// - /// When a new proof message arrives, before processing it - /// we check that this mutex isn't locked and if it is we wait until unlocked - /// - /// This check covers the case where a new user submits a message while a batch is in construction - /// Used to synchronize the processing of proofs during batch construction. - building_batch_mutex: Mutex<()>, + /// A map of per-user mutexes used to synchronize proof processing. /// It allows us to mutate the users state atomically, /// while avoiding the need to lock the entire [`batch_state`] structure. @@ -295,7 +287,6 @@ impl Batcher { aggregator_gas_cost: config.batcher.aggregator_gas_cost, batch_state: Mutex::new(batch_state), user_proof_processing_mutexes: Mutex::new(HashMap::new()), - building_batch_mutex: Mutex::new(()), disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, @@ -632,11 +623,6 @@ impl Batcher { debug!("Received message with nonce: {msg_nonce:?}"); self.metrics.received_proofs.inc(); - // Make sure there are no batches being built before processing the message - debug!("Checking if there is an ongoing batch before processing the message..."); - let _ = self.building_batch_mutex.lock().await; - debug!("Batch building mutex acquired. Proceeding with message processing."); - // * ---------------------------------------------------* // * Perform validations over the message * // * ---------------------------------------------------* @@ -754,28 +740,7 @@ impl Batcher { .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() }; - - // This looks very ugly but basically, we are doing the following: - // 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it - // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `building_batch_mutex` - // 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish) - // 4. If it isn't building then continue with the message - // - // This is done to give the batcher builder process priority - // and prevent a situation where the batcher wants to build a new batch - // but it has to wait for a ton of messages to be processed first - // Leading to a decrease in batch throughput - let _user_mutex = loop { - let _user_mutex = user_mutex.lock().await; - let res = self.building_batch_mutex.try_lock(); - if res.is_ok() { - break _user_mutex; - } else { - drop(_user_mutex); - // tell the runtime to we are done for now and continue with another task - tokio::task::yield_now().await; - } - }; + let _user_mutex = user_mutex.lock().await; debug!("User mutex for {:?} acquired...", addr_in_msg); let msg_max_fee = nonced_verification_data.max_fee; @@ -1316,19 +1281,6 @@ impl Batcher { return None; } - info!("Batch building: started, acquiring lock to stop processing new messages..."); - let _building_batch_mutex = self.building_batch_mutex.lock().await; - - info!("Batch building: waiting until all the user messages and proofs get processed"); - let mutexes: Vec>> = { - let user_proofs_lock = self.user_proof_processing_mutexes.lock().await; - user_proofs_lock.values().cloned().collect() - }; - for user_mutex in mutexes { - let _ = user_mutex.lock().await; - } - info!("Batch building: all user locks acquired, proceeding to build batch"); - let batch_queue_copy = { let batch_state_lock = self.batch_state.lock().await; batch_state_lock.batch_queue.clone() From c48c27240fe7934b4053f7cdcedd614d8a7e392c Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 3 Jul 2025 11:34:01 -0300 Subject: [PATCH 17/18] docs: batcher mutexes --- crates/batcher/src/lib.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index dc2866c5bf..cdf74eb55c 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -84,19 +84,21 @@ pub struct Batcher { payment_service_fallback: BatcherPaymentService, service_manager: ServiceManager, service_manager_fallback: ServiceManager, - /// Holds both the user state and the proofs queue. + /// Shared state containing both the user data and the proofs queue. /// - /// We should consider splitting the user state and the queue into separate mutexes - /// to improve concurrency. + /// This state is accessed concurrently by the proof messages handler + /// and the batches builder. Any mutation typically consists of: + /// 1. Adding and/or removing a proof from the queue. + /// 2. Updating the corresponding user state based on 1. + /// + /// The `batch_state` lock MUST be held for the full duration of both steps + /// to ensure consistency in the state. batch_state: Mutex, - - /// A map of per-user mutexes used to synchronize proof processing. - /// It allows us to mutate the users state atomically, - /// while avoiding the need to lock the entire [`batch_state`] structure. + /// A map of per-user mutexes. /// - /// During batch building, the process also locks these per-user mutexes - /// (after acquiring [`building_batch_mutex`]) to ensure that all ongoing - /// proof messages complete and the state remains consistent. + /// It allows us to synchronize the processing of proof messages + /// per user ensuring the state is mutated atomically between each user, + /// while avoiding the need to lock the entire [`batch_state`] structure. user_proof_processing_mutexes: Mutex>>>, min_block_interval: u64, transaction_wait_timeout: u64, From 96def6cd6efbf3d6cb28fd60992d6e22ff16c108 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 3 Jul 2025 11:46:36 -0300 Subject: [PATCH 18/18] refactor: comments, tracing and err messages --- crates/batcher/src/lib.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index cdf74eb55c..5e25096af8 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -933,13 +933,6 @@ impl Batcher { return Ok(()); }; - // Finally, we remove the mutex from the map - // - // Note: this removal is safe even if other processes are waiting on the lock - // This is because it is wrapped on an Arc so the variable will still live until all clones are dropped. - let mut user_mutexes = self.user_proof_processing_mutexes.lock().await; - user_mutexes.remove(&addr); - info!("Verification data message handled"); Ok(()) } @@ -1072,6 +1065,10 @@ impl Batcher { // note that the entries are considered equal for the priority queue // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry + // + // Note: If `remove` returns `None`, it means the proof was already removed— + // likely by the batch building process running in parallel. + // This is rare but expected due to concurrent access. let mut batch_state_lock = self.batch_state.lock().await; if batch_state_lock .batch_queue @@ -1082,7 +1079,7 @@ impl Batcher { warn!("Replacement entry for {addr:?} was not present in batcher queue"); send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, + SubmitProofResponseMessage::InvalidReplacementMessage, ) .await; return; @@ -1241,16 +1238,11 @@ impl Batcher { } /// Given a new block number listened from the blockchain, checks if the current batch is ready to be posted. - /// /// There are essentially two conditions to be checked: /// * Has the current batch reached the minimum size to be posted? /// * Has the received block number surpassed the maximum interval with respect to the last posted batch block? /// - /// If both are met then: - /// * We acquire the building batch mutex to stop processing new proof messages - /// * We acquire all the users locks to wait until all current proof messages are processed - /// - /// Once we hold them, the biggest possible batch will be built, making sure that: + /// Then the batch will be made as big as possible given this two conditions: /// * The serialized batch size needs to be smaller than the maximum batch size /// * The batch submission fee is less than the lowest `max fee` included the batch, /// * And the batch submission fee is more than the highest `max fee` not included the batch. @@ -1283,6 +1275,7 @@ impl Batcher { return None; } + info!("Trying to build a new batch..."); let batch_queue_copy = { let batch_state_lock = self.batch_state.lock().await; batch_state_lock.batch_queue.clone()