Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config-files/config-batcher-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600
Comment thread
JuArce marked this conversation as resolved.
1 change: 1 addition & 0 deletions crates/batcher/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct BatcherConfigFromYaml {
pub non_paying: Option<NonPayingConfigFromYaml>,
pub amount_of_proofs_for_min_max_fee: usize,
pub min_bump_percentage: u64,
pub balance_unlock_polling_interval_seconds: u64,
}

#[derive(Debug, Deserialize)]
Expand Down
256 changes: 251 additions & 5 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use eth::utils::{calculate_bumped_gas_price, get_batcher_signer, get_gas_price};
use ethers::contract::ContractError;
use ethers::signers::Signer;
use retry::batcher_retryables::{
cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable,
get_user_nonce_from_ethereum_retryable, simulate_create_new_task_retryable,
user_balance_is_unlocked_retryable,
cancel_create_new_task_retryable, create_new_task_retryable,
get_current_block_number_retryable, get_user_balance_retryable,
get_user_nonce_from_ethereum_retryable, query_balance_unlocked_events_retryable,
simulate_create_new_task_retryable, user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::{timeout, Instant};
Expand Down Expand Up @@ -39,8 +40,8 @@ use aligned_sdk::common::types::{

use aws_sdk_s3::client::Client as S3Client;
use eth::payment_service::{BatcherPaymentService, CreateNewTaskFeeParams, SignerMiddlewareT};
use ethers::prelude::{Middleware, Provider};
use ethers::types::{Address, Signature, TransactionReceipt, U256};
use ethers::prelude::{Http, Middleware, Provider};
use ethers::types::{Address, Signature, TransactionReceipt, U256, U64};
use futures_util::{future, join, SinkExt, StreamExt, TryStreamExt};
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
Expand Down Expand Up @@ -86,6 +87,8 @@ pub struct Batcher {
eth_ws_url_fallback: String,
batcher_signer: Arc<SignerMiddlewareT>,
batcher_signer_fallback: Arc<SignerMiddlewareT>,
eth_http_provider: Provider<Http>,
eth_http_provider_fallback: Provider<Http>,
chain_id: U256,
payment_service: BatcherPaymentService,
payment_service_fallback: BatcherPaymentService,
Expand All @@ -103,6 +106,7 @@ pub struct Batcher {
current_min_max_fee: RwLock<U256>,
amount_of_proofs_for_min_max_fee: usize,
min_bump_percentage: U256,
balance_unlock_polling_interval_seconds: u64,

// Shared state access:
// Two kinds of threads interact with the shared state:
Expand Down Expand Up @@ -315,6 +319,8 @@ impl Batcher {
eth_ws_url_fallback: config.eth_ws_url_fallback,
batcher_signer,
batcher_signer_fallback,
eth_http_provider,
eth_http_provider_fallback,
chain_id,
payment_service,
payment_service_fallback,
Expand All @@ -327,6 +333,9 @@ impl Batcher {
max_batch_proof_qty: config.batcher.max_batch_proof_qty,
amount_of_proofs_for_min_max_fee: config.batcher.amount_of_proofs_for_min_max_fee,
min_bump_percentage: U256::from(config.batcher.min_bump_percentage),
balance_unlock_polling_interval_seconds: config
.batcher
.balance_unlock_polling_interval_seconds,
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
non_paying_config,
Expand Down Expand Up @@ -491,6 +500,243 @@ impl Batcher {
.map_err(|e| e.inner())
}

/// Poll for BalanceUnlocked events from BatcherPaymentService contract.
/// Runs at configurable intervals and checks recent blocks for events (2x the polling interval).
/// When an event is detected, removes user's proofs from queue and resets UserState.
pub async fn poll_balance_unlocked_events(self: Arc<Self>) -> Result<(), BatcherError> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
self.balance_unlock_polling_interval_seconds,
));

loop {
interval.tick().await;

if let Err(e) = self.process_balance_unlocked_events().await {
error!("Error processing BalanceUnlocked events: {:?}", e);
// Continue polling even if there's an error
}
}
}

async fn process_balance_unlocked_events(&self) -> Result<(), BatcherError> {
// Get current block number using HTTP providers
let current_block = match self.get_current_block_number().await {
Ok(block) => block,
Err(e) => {
warn!("Failed to get current block number: {:?}", e);
return Ok(());
}
};
Comment thread
JuArce marked this conversation as resolved.
Outdated

// Calculate the block range based on polling interval
// Formula: interval / 12 * 2 (assuming 12-second block times, look back 2x the interval)
let block_range = (self.balance_unlock_polling_interval_seconds / 12) * 2;
let from_block = current_block.saturating_sub(U64::from(block_range));

// Query events with retry logic
let events = match self
.query_balance_unlocked_events(from_block, current_block)
.await
{
Ok(events) => events,
Err(e) => {
warn!(
"Failed to query BalanceUnlocked events after retries: {:?}",
e
);
return Ok(());
}
};
Comment thread
JuArce marked this conversation as resolved.
Outdated

info!(
"Found {} BalanceUnlocked events in blocks {} to {}",
events.len(),
from_block,
current_block
);

// Process each event
for event in events {
let user_address = event.user;
info!(
Comment thread
JuArce marked this conversation as resolved.
Outdated
"Processing BalanceUnlocked event for user: {:?}",
user_address
);

// Check if user has proofs in queue
if self.user_has_proofs_in_queue(user_address).await {
info!(
Comment thread
JuArce marked this conversation as resolved.
Outdated
"User {:?} has proofs in queue, verifying funds are still unlocked",
user_address
);

// Double-check that funds are still unlocked by calling the contract
Comment thread
JuArce marked this conversation as resolved.
Outdated
if self.user_balance_is_unlocked(&user_address).await {
info!("User {:?} funds confirmed unlocked, removing proofs and resetting UserState", user_address);
Comment thread
JuArce marked this conversation as resolved.
Outdated
self.remove_user_proofs_and_reset_state(user_address).await;
} else {
info!(
Comment thread
JuArce marked this conversation as resolved.
Outdated
"User {:?} funds are now locked, ignoring stale unlock event",
user_address
);
}
} else {
info!(
Comment thread
JuArce marked this conversation as resolved.
"User {:?} has no proofs in queue, ignoring event",
user_address
);
}
}

Ok(())
}

/// Gets the current block number from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn get_current_block_number(&self) -> Result<U64, BatcherError> {
retry_function(
|| {
get_current_block_number_retryable(
&self.eth_http_provider,
&self.eth_http_provider_fallback,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| {
error!("Failed to get current block number: {:?}", e);
Comment thread
JuArce marked this conversation as resolved.
Outdated
BatcherError::EthereumProviderError(e.inner())
})
}

/// Queries BalanceUnlocked events from the BatcherPaymentService contract.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn query_balance_unlocked_events(
&self,
from_block: U64,
to_block: U64,
) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, BatcherError>
{
retry_function(
|| {
query_balance_unlocked_events_retryable(
&self.payment_service,
&self.payment_service_fallback,
from_block,
to_block,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| {
error!("Failed to query BalanceUnlocked events: {:?}", e);
Comment thread
JuArce marked this conversation as resolved.
Outdated
BatcherError::EthereumProviderError(e.inner())
})
}

async fn user_has_proofs_in_queue(&self, user_address: Address) -> bool {
let user_states = self.user_states.read().await;
if let Some(user_state) = user_states.get(&user_address) {
if let Some(user_state_guard) = self
.try_user_lock_with_timeout(user_address, user_state.lock())
.await
{
user_state_guard.proofs_in_batch > 0
} else {
false
}
} else {
false
}
}

async fn remove_user_proofs_and_reset_state(&self, user_address: Address) {
// Follow locking rules: acquire user_states before batch_state to avoid deadlocks
let user_states = self.user_states.write().await;

// Use timeout for batch lock
let batch_state_guard = match self
.try_batch_lock_with_timeout(self.batch_state.lock())
Comment thread
JuArce marked this conversation as resolved.
Outdated
.await
{
Some(guard) => guard,
None => {
warn!(
"Failed to acquire batch lock for user {:?}, skipping removal",
Comment thread
JuArce marked this conversation as resolved.
Outdated
user_address
);
return;
}
};

let mut batch_state_guard = batch_state_guard;

// Process all entries for this user
while let Some(entry) = batch_state_guard
.batch_queue
.iter()
.find(|(entry, _)| entry.sender == user_address)
.map(|(entry, _)| entry.clone())
Comment thread
JuArce marked this conversation as resolved.
Outdated
{
// Notify user via websocket before removing the proof
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::UserFundsUnlocked,
)
.await;
Comment thread
JuArce marked this conversation as resolved.
Outdated

// Close websocket connection
let mut sink_guard = ws_sink.write().await;
if let Err(e) = sink_guard.close().await {
warn!(
"Error closing websocket for user {:?}: {:?}",
user_address, e
);
} else {
info!("Closed websocket connection for user {:?}", user_address);
}
}

// Remove the entry from batch queue
batch_state_guard.batch_queue.remove(&entry);
info!(
"Removed proof with nonce {} for user {:?} from batch queue",
entry.nonced_verification_data.nonce, user_address
);
}

// Reset UserState using timeout
if let Some(user_state) = user_states.get(&user_address) {
if let Some(mut user_state_guard) = self
.try_user_lock_with_timeout(user_address, user_state.lock())
.await
{
let proofs_count = user_state_guard.proofs_in_batch;
user_state_guard.nonce -= U256::from(proofs_count);
user_state_guard.proofs_in_batch = 0;
user_state_guard.total_fees_in_queue = U256::zero();
user_state_guard.last_max_fee_limit = U256::max_value();
info!("Reset UserState for user {:?}", user_address);
} else {
Comment thread
JuArce marked this conversation as resolved.
Outdated
warn!(
"Failed to acquire user lock for {:?}, skipping UserState reset",
user_address
);
}
}
}

pub async fn listen_new_blocks_retryable(
self: Arc<Self>,
) -> Result<(), RetryError<BatcherError>> {
Expand Down
10 changes: 10 additions & 0 deletions crates/batcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ async fn main() -> Result<(), BatcherError> {
}
});

// spawn task to poll for BalanceUnlocked events
tokio::spawn({
let app = batcher.clone();
async move {
app.poll_balance_unlocked_events()
.await
.expect("Error polling BalanceUnlocked events")
}
});

batcher.metrics.inc_batcher_restart();

batcher.listen_connections(&address).await?;
Expand Down
Loading
Loading