Skip to content
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 2 additions & 36 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ pub struct Batcher {
/// needs to be stopped, and all user_states locks need to be taken
batch_state: Mutex<BatchState>,

/// Flag to indicate when recovery is in progress
/// When true, message handlers will return ServerBusy responses
/// It's used a way to "lock" all the user_states at the same time
/// If one needed is taken in the handle message it will time out
is_recovering_from_submission_failure: RwLock<bool>,
user_states: Arc<RwLock<HashMap<Address, Arc<Mutex<UserState>>>>>,

last_uploaded_batch_block: Mutex<u64>,
Expand Down Expand Up @@ -328,7 +323,6 @@ impl Batcher {
batch_state: Mutex::new(batch_state),
user_states,
disabled_verifiers: Mutex::new(disabled_verifiers),
is_recovering_from_submission_failure: RwLock::new(false),
metrics,
telemetry,
}
Expand Down Expand Up @@ -702,17 +696,6 @@ impl Batcher {
mut address: Address,
ws_conn_sink: WsMessageSink,
) -> Result<(), Error> {
// Check if restoration is in progress
if *self.is_recovering_from_submission_failure.read().await {
warn!(
"Rejecting nonce request from {} during restoration",
address
);
let response = GetNonceResponseMessage::ServerBusy;
send_message(ws_conn_sink, response).await;
return Ok(());
}

// If the address is not paying, we will return the nonce of the aligned_payment_address
if !self.has_to_pay(&address) {
info!("Handling nonpaying message");
Expand Down Expand Up @@ -797,21 +780,6 @@ impl Batcher {
debug!("Received message with nonce: {msg_nonce:?}");
self.metrics.received_proofs.inc();

// Check if restoration is in progress
if *self.is_recovering_from_submission_failure.read().await {
warn!(
"Rejecting proof submission from {} during restoration (nonce: {})",
client_msg
.verification_data
.verification_data
.proof_generator_addr,
msg_nonce
);
let response = SubmitProofResponseMessage::ServerBusy;
send_message(ws_conn_sink, response).await;
return Ok(());
}

// * ---------------------------------------------------*
// * Perform validations over the message *
// * ---------------------------------------------------*
Expand Down Expand Up @@ -1619,9 +1587,7 @@ impl Batcher {
failed_batch.len()
);

// Set restoration flag to stop handling new user messages
*self.is_recovering_from_submission_failure.write().await = true;

let user_states_lock = self.user_states.blocking_write();
let mut batch_state_lock = self.batch_state.lock().await;
let mut restored_entries = Vec::new();

Expand Down Expand Up @@ -1687,8 +1653,8 @@ impl Batcher {
// Only auxiliary user data (max_min_fee) can be "inconsistent"
// but we can keep updating it without locking the queue
info!("Queue recovered from submission failure, resuming user processing and updating user states metadata");
std::mem::drop(user_states_lock);
std::mem::drop(batch_state_lock);
*self.is_recovering_from_submission_failure.write().await = false;

info!("Updating user states after proof restoration...");
if let Err(e) = self
Expand Down
Loading