Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config-files/config-batcher-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600
Comment thread
JuArce marked this conversation as resolved.
1 change: 1 addition & 0 deletions crates/batcher/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct BatcherConfigFromYaml {
pub non_paying: Option<NonPayingConfigFromYaml>,
pub amount_of_proofs_for_min_max_fee: usize,
pub min_bump_percentage: u64,
pub balance_unlock_polling_interval_seconds: u64,
}

#[derive(Debug, Deserialize)]
Expand Down
212 changes: 207 additions & 5 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use eth::utils::{calculate_bumped_gas_price, get_batcher_signer, get_gas_price};
use ethers::contract::ContractError;
use ethers::signers::Signer;
use retry::batcher_retryables::{
cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable,
get_user_nonce_from_ethereum_retryable, simulate_create_new_task_retryable,
user_balance_is_unlocked_retryable,
cancel_create_new_task_retryable, create_new_task_retryable,
get_current_block_number_retryable, get_user_balance_retryable,
get_user_nonce_from_ethereum_retryable, query_balance_unlocked_events_retryable,
simulate_create_new_task_retryable, user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::{timeout, Instant};
Expand Down Expand Up @@ -39,8 +40,8 @@ use aligned_sdk::common::types::{

use aws_sdk_s3::client::Client as S3Client;
use eth::payment_service::{BatcherPaymentService, CreateNewTaskFeeParams, SignerMiddlewareT};
use ethers::prelude::{Middleware, Provider};
use ethers::types::{Address, Signature, TransactionReceipt, U256};
use ethers::prelude::{Http, Middleware, Provider};
use ethers::types::{Address, Signature, TransactionReceipt, U256, U64};
use futures_util::{future, join, SinkExt, StreamExt, TryStreamExt};
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
Expand Down Expand Up @@ -86,6 +87,8 @@ pub struct Batcher {
eth_ws_url_fallback: String,
batcher_signer: Arc<SignerMiddlewareT>,
batcher_signer_fallback: Arc<SignerMiddlewareT>,
eth_http_provider: Provider<Http>,
eth_http_provider_fallback: Provider<Http>,
chain_id: U256,
payment_service: BatcherPaymentService,
payment_service_fallback: BatcherPaymentService,
Expand All @@ -103,6 +106,7 @@ pub struct Batcher {
current_min_max_fee: RwLock<U256>,
amount_of_proofs_for_min_max_fee: usize,
min_bump_percentage: U256,
balance_unlock_polling_interval_seconds: u64,

// Shared state access:
// Two kinds of threads interact with the shared state:
Expand Down Expand Up @@ -315,6 +319,8 @@ impl Batcher {
eth_ws_url_fallback: config.eth_ws_url_fallback,
batcher_signer,
batcher_signer_fallback,
eth_http_provider,
eth_http_provider_fallback,
chain_id,
payment_service,
payment_service_fallback,
Expand All @@ -327,6 +333,9 @@ impl Batcher {
max_batch_proof_qty: config.batcher.max_batch_proof_qty,
amount_of_proofs_for_min_max_fee: config.batcher.amount_of_proofs_for_min_max_fee,
min_bump_percentage: U256::from(config.batcher.min_bump_percentage),
balance_unlock_polling_interval_seconds: config
.batcher
.balance_unlock_polling_interval_seconds,
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
non_paying_config,
Expand Down Expand Up @@ -491,6 +500,199 @@ impl Batcher {
.map_err(|e| e.inner())
}

/// Poll for BalanceUnlocked events from BatcherPaymentService contract.
/// Runs at configurable intervals and checks recent blocks for events (2x the polling interval).
/// When an event is detected, removes user's proofs from queue and resets UserState.
pub async fn poll_balance_unlocked_events(self: Arc<Self>) -> Result<(), BatcherError> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
self.balance_unlock_polling_interval_seconds,
));

loop {
interval.tick().await;

if let Err(e) = self.process_balance_unlocked_events().await {
error!("Error processing BalanceUnlocked events: {:?}", e);
// Continue polling even if there's an error
}
}
}

async fn process_balance_unlocked_events(&self) -> Result<(), BatcherError> {
// Get current block number using HTTP providers
let current_block = self.get_current_block_number().await.map_err(|e| {
BatcherError::EthereumProviderError(format!(
"Failed to get current block number: {:?}",
e
))
})?;

// Calculate the block range based on polling interval
// Formula: interval / 12 * 2 (assuming 12-second block times, look back 2x the interval)
let block_range = (self.balance_unlock_polling_interval_seconds / 12) * 2;
let from_block = current_block.saturating_sub(U64::from(block_range));

// Query events with retry logic
let events = self
.query_balance_unlocked_events(from_block, current_block)
.await
.map_err(|e| {
BatcherError::EthereumProviderError(format!(
"Failed to query BalanceUnlocked events: {:?}",
e
))
})?;

info!(
"Found {} BalanceUnlocked events in blocks {} to {}",
events.len(),
from_block,
current_block
);

// Process each event
for event in events {
let user_address = event.user;
debug!(
"Processing BalanceUnlocked event for user: {:?}",
user_address
);

// Check if user has proofs in queue
// Double-check that funds are still unlocked by calling the contract
// This is necessary because we query events over a block range, and the
// user’s state may have changed (e.g., funds could be locked again) after
// the event was emitted. Verifying on-chain ensures we don’t act on stale data.
if self.user_has_proofs_in_queue(user_address).await
&& self.user_balance_is_unlocked(&user_address).await
{
info!(
Comment thread
JuArce marked this conversation as resolved.
"User {:?} has proofs in queue and funds are unlocked, proceeding to remove proofs and resetting UserState",
user_address
);
self.remove_user_proofs_and_reset_state(user_address).await;
}
}

Ok(())
}

/// Gets the current block number from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn get_current_block_number(&self) -> Result<U64, RetryError<String>> {
retry_function(
|| {
get_current_block_number_retryable(
&self.eth_http_provider,
&self.eth_http_provider_fallback,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
}

/// Queries BalanceUnlocked events from the BatcherPaymentService contract.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn query_balance_unlocked_events(
&self,
from_block: U64,
to_block: U64,
) -> Result<
Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>,
RetryError<String>,
> {
retry_function(
|| {
query_balance_unlocked_events_retryable(
&self.payment_service,
&self.payment_service_fallback,
from_block,
to_block,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
}

async fn user_has_proofs_in_queue(&self, user_address: Address) -> bool {
let user_states = self.user_states.read().await;
if let Some(user_state) = user_states.get(&user_address) {
if let Some(user_state_guard) = self
.try_user_lock_with_timeout(user_address, user_state.lock())
.await
{
user_state_guard.proofs_in_batch > 0
} else {
false
}
} else {
false
}
}

async fn remove_user_proofs_and_reset_state(&self, user_address: Address) {
// Use timeout for batch lock
let mut batch_state_guard = match self
.try_batch_lock_with_timeout(self.batch_state.lock())
Comment thread
JuArce marked this conversation as resolved.
Outdated
.await
{
Some(guard) => guard,
None => {
warn!(
"Failed to acquire batch lock for user {:?}, skipping removal",
Comment thread
JuArce marked this conversation as resolved.
Outdated
user_address
);
return;
}
};

let removed_entries = batch_state_guard
.batch_queue
.extract_if(|entry, _| entry.sender == user_address);

// Notify user via websocket before removing the proofs
Comment thread
MauroToscano marked this conversation as resolved.
Outdated
for (entry, _) in removed_entries {
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::UserFundsUnlocked,
)
.await;
Comment thread
JuArce marked this conversation as resolved.
Outdated
// Close websocket connection
let mut sink_guard = ws_sink.write().await;
if let Err(e) = sink_guard.close().await {
warn!(
"Error closing websocket for user {:?}: {:?}",
user_address, e
);
} else {
info!("Closed websocket connection for user {:?}", user_address);
}
}
info!(
"Removed proof with nonce {} for user {:?} from batch queue",
entry.nonced_verification_data.nonce, user_address
);
}

// Remove UserState entry
self.user_states.write().await.remove(&user_address);
info!(
"Removed UserState entry for user {:?} after processing BalanceUnlocked event",
user_address
);
}

pub async fn listen_new_blocks_retryable(
self: Arc<Self>,
) -> Result<(), RetryError<BatcherError>> {
Expand Down
10 changes: 10 additions & 0 deletions crates/batcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ async fn main() -> Result<(), BatcherError> {
}
});

// spawn task to poll for BalanceUnlocked events
tokio::spawn({
let app = batcher.clone();
async move {
app.poll_balance_unlocked_events()
.await
.expect("Error polling BalanceUnlocked events")
}
});

batcher.metrics.inc_batcher_restart();

batcher.listen_connections(&address).await?;
Expand Down
45 changes: 45 additions & 0 deletions crates/batcher/src/retry/batcher_retryables.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use ethers::prelude::*;
use ethers::providers::Http;
use log::{info, warn};
use tokio::time::timeout;

Expand Down Expand Up @@ -285,3 +286,47 @@ pub async fn cancel_create_new_task_retryable(
"Receipt not found".to_string(),
)))
}

pub async fn get_current_block_number_retryable(
eth_http_provider: &Provider<Http>,
eth_http_provider_fallback: &Provider<Http>,
) -> Result<U64, RetryError<String>> {
if let Ok(block_number) = eth_http_provider.get_block_number().await {
return Ok(block_number);
}

eth_http_provider_fallback
.get_block_number()
.await
.map_err(|e| {
warn!("Failed to get current block number: {e}");
RetryError::Transient(e.to_string())
})
}

pub async fn query_balance_unlocked_events_retryable(
payment_service: &BatcherPaymentService,
payment_service_fallback: &BatcherPaymentService,
from_block: U64,
to_block: U64,
) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, RetryError<String>>
{
let filter = payment_service
.balance_unlocked_filter()
.from_block(from_block)
.to_block(to_block);

if let Ok(events) = filter.query().await {
return Ok(events);
}

let filter_fallback = payment_service_fallback
.balance_unlocked_filter()
.from_block(from_block)
.to_block(to_block);

filter_fallback.query().await.map_err(|e| {
warn!("Failed to query BalanceUnlocked events: {e}");
RetryError::Transient(e.to_string())
})
}
4 changes: 4 additions & 0 deletions crates/batcher/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub enum BatcherError {
WsSinkEmpty,
AddressNotFoundInUserStates(Address),
QueueRemoveError(String),
EthereumProviderError(String),
}

impl From<tungstenite::Error> for BatcherError {
Expand Down Expand Up @@ -147,6 +148,9 @@ impl fmt::Debug for BatcherError {
BatcherError::QueueRemoveError(e) => {
write!(f, "Error while removing entry from queue: {}", e)
}
BatcherError::EthereumProviderError(e) => {
write!(f, "Ethereum provider error: {}", e)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/sdk/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub enum SubmitError {
GetNonceError(String),
BatchQueueLimitExceededError,
GenericError(String),
UserFundsUnlocked,
}

impl From<tokio_tungstenite::tungstenite::Error> for SubmitError {
Expand Down Expand Up @@ -216,6 +217,10 @@ impl fmt::Display for SubmitError {
}

SubmitError::GetNonceError(e) => write!(f, "Error while getting nonce {}", e),
SubmitError::UserFundsUnlocked => write!(
f,
"User funds have been unlocked and proofs removed from queue"
),
}
}
}
Expand Down
Loading
Loading