Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5e21e62
Change user_last_max_fee validation
Mechanix97 Apr 16, 2025
9bc03de
verify batch limits
Mechanix97 Apr 16, 2025
27f46a9
Add try_push mock
Mechanix97 Apr 16, 2025
3dbf64a
fix warnings
Mechanix97 Apr 16, 2025
0bb312c
Change try_push_to_queue logic
Mechanix97 Apr 16, 2025
50b225b
Add remove from q logic
Mechanix97 Apr 16, 2025
72cfeb2
cargo fmt
Mechanix97 Apr 16, 2025
b037de8
add errors
Mechanix97 Apr 16, 2025
c02e861
Use DoublePriorityQueue
Mechanix97 Apr 21, 2025
d4b0c86
Fix queue len
Mechanix97 Apr 21, 2025
2d5ab6a
cargo fmt
Mechanix97 Apr 21, 2025
e8654bd
Fix format
Mechanix97 Apr 21, 2025
d500e2c
Remove unused error
Mechanix97 Apr 21, 2025
c311cd0
Modify len
Mechanix97 Apr 21, 2025
8ef8f50
Remove double q
Mechanix97 Apr 21, 2025
93209db
Merge remote-tracking branch 'origin/staging' into feat/batch_queue_l…
Mechanix97 Apr 21, 2025
31ee142
Add new error type
Mechanix97 Apr 22, 2025
e23efc7
Merge remote-tracking branch 'origin/staging' into feat/batch_queue_l…
Mechanix97 Apr 22, 2025
b389750
remove sub
Mechanix97 Apr 22, 2025
ec9870a
cargo fmt
Mechanix97 Apr 22, 2025
085d0db
Add new config param
Mechanix97 Apr 22, 2025
206098b
Add validation and update user_state
Mechanix97 Apr 22, 2025
86b2548
remove warning
Mechanix97 Apr 22, 2025
e3aac58
Add verification to non-payers
Mechanix97 Apr 22, 2025
eef1a05
fix spelling
Mechanix97 Apr 22, 2025
5695a7a
change max_proof_size to 4 MiB
Mechanix97 Apr 22, 2025
aead1d8
refactor: drop batch lock early and more readable code
MarcosNicolau Apr 28, 2025
7b92058
chore: address clippy warnings
MarcosNicolau Apr 28, 2025
c4c864b
refactor: entry removal
MarcosNicolau Apr 28, 2025
8fc52d5
chore: address clippy warnings
MarcosNicolau Apr 28, 2025
eeb557d
refactor: compare with Ordering
MarcosNicolau Apr 28, 2025
f89d7c4
fix: last_max_fee_limit
JuArce May 9, 2025
2f8df91
Simplify functions
MauroToscano May 9, 2025
23a941a
Fix conflicts
MauroToscano May 9, 2025
0e918af
Fix compile
MauroToscano May 9, 2025
cc7cc05
Add comments
MauroToscano May 9, 2025
1a706b2
fix: cargo fmt
JuArce May 9, 2025
29e87b3
fix: undo msg_max_fee > user_last_max_fee_limit changes
JuArce May 9, 2025
656243f
fix: cargo fmt
JuArce May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions batcher/aligned-batcher/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct BatcherConfigFromYaml {
pub max_proof_size: usize,
pub max_batch_byte_size: usize,
pub max_batch_proof_qty: usize,
pub max_queue_size: usize,
pub pre_verification_is_enabled: bool,
pub metrics_port: u16,
pub telemetry_ip_port_address: String,
Expand Down
97 changes: 84 additions & 13 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use types::batch_state::BatchState;
use types::user_state::UserState;

use batch_queue::calculate_batch_size;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
Expand Down Expand Up @@ -210,7 +211,7 @@ impl Batcher {
.expect("Failed to get fallback Service Manager contract");

let mut user_states = HashMap::new();
let mut batch_state = BatchState::new();
let mut batch_state = BatchState::new(config.batcher.max_queue_size);
let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying {
warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.",
non_paying_config.address);
Expand All @@ -228,7 +229,8 @@ impl Batcher {
non_paying_user_state,
);

batch_state = BatchState::new_with_user_states(user_states);
batch_state =
BatchState::new_with_user_states(user_states, config.batcher.max_queue_size);
Some(non_paying_config)
} else {
None
Expand Down Expand Up @@ -702,7 +704,7 @@ impl Batcher {
// This is needed because we need to query the user state to make validations and
// finally add the proof to the batch queue.

let batch_state_lock = self.batch_state.lock().await;
let mut batch_state_lock = self.batch_state.lock().await;

let msg_max_fee = nonced_verification_data.max_fee;
let Some(user_last_max_fee_limit) =
Expand All @@ -718,6 +720,18 @@ impl Batcher {
return Ok(());
};

if msg_max_fee > user_last_max_fee_limit {
std::mem::drop(batch_state_lock);
warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidMaxFee,
)
.await;
self.metrics.user_error(&["invalid_max_fee", ""]);
return Ok(());
}

let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await
else {
std::mem::drop(batch_state_lock);
Expand Down Expand Up @@ -782,16 +796,63 @@ impl Batcher {
return Ok(());
}

if msg_max_fee > user_last_max_fee_limit {
std::mem::drop(batch_state_lock);
warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidMaxFee,
)
.await;
self.metrics.user_error(&["invalid_max_fee", ""]);
return Ok(());
// * ---------------------------------------------------------------------*
// * Perform validation over batcher queue *
// * ---------------------------------------------------------------------*

if batch_state_lock.is_queue_full() {
info!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");

let msg_entry_priority = BatchQueueEntryPriority::new(
nonced_verification_data.max_fee,
nonced_verification_data.nonce,
);

if let Some(lowest_entry_priority) = batch_state_lock.lowest_entry_priority() {
// If the new proof has more priority than the lowest one in the queue, discard the latter one and push the new one
if lowest_entry_priority.cmp(&msg_entry_priority) == Ordering::Greater {
let Some((removed_entry, _)) = batch_state_lock.batch_queue.pop() else {
warn!("Failed to remove lowest-priority proof despite queue being full.");
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::BatchQueueLimitExceededError,
)
.await;
return Ok(());
};

info!(
"Incoming proof (nonce: {}, fee: {}) has higher priority. Replacing lowest priority proof from sender {} with nonce {}.",
nonced_verification_data.nonce,
nonced_verification_data.max_fee,
removed_entry.sender,
removed_entry.nonced_verification_data.nonce
);

batch_state_lock.update_user_state_on_entry_removal(&removed_entry);
if let Some(removed_entry_ws) = removed_entry.messaging_sink {
send_message(
removed_entry_ws,
SubmitProofResponseMessage::BatchQueueLimitExceededError,
)
.await;
};
} else {
warn!(
"Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.",
nonced_verification_data.nonce,
nonced_verification_data.max_fee
);
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::BatchQueueLimitExceededError,
)
.await;
return Ok(());
}
}
}

// * ---------------------------------------------------------------------*
Expand Down Expand Up @@ -1727,6 +1788,16 @@ impl Batcher {

let batch_state_lock = self.batch_state.lock().await;

if batch_state_lock.is_queue_full() {
error!("Can't add new entry, the batcher queue is full");
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::BatchQueueLimitExceededError,
)
.await;
return Ok(());
}

let nonced_verification_data = NoncedVerificationData::new(
client_msg.verification_data.verification_data.clone(),
client_msg.verification_data.nonce,
Expand Down
52 changes: 50 additions & 2 deletions batcher/aligned-batcher/src/types/batch_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::{hash_map::Entry, HashMap};

use crate::BatchQueueEntryPriority;

use super::{
batch_queue::{BatchQueue, BatchQueueEntry},
user_state::UserState,
Expand All @@ -10,22 +12,28 @@ use log::debug;
pub(crate) struct BatchState {
pub(crate) batch_queue: BatchQueue,
pub(crate) user_states: HashMap<Address, UserState>,
pub(crate) max_size: usize,
}

impl BatchState {
// CONSTRUCTORS:

pub(crate) fn new() -> Self {
pub(crate) fn new(max_size: usize) -> Self {
Self {
batch_queue: BatchQueue::new(),
user_states: HashMap::new(),
max_size,
}
}

pub(crate) fn new_with_user_states(user_states: HashMap<Address, UserState>) -> Self {
pub(crate) fn new_with_user_states(
user_states: HashMap<Address, UserState>,
max_size: usize,
) -> Self {
Self {
batch_queue: BatchQueue::new(),
user_states,
max_size,
}
}

Expand Down Expand Up @@ -214,4 +222,44 @@ impl BatchState {
&& entry.nonced_verification_data.max_fee < replacement_max_fee
})
}

/// Updates or removes a user's state when their latest proof entry is removed from the batch queue.
///
/// If the user has no other proofs remaining in the queue, their state is removed entirely.
/// Otherwise, the user's state is updated to reflect the next most recent entry in the queue.
///
/// Note: The given `removed_entry` must be the most recent (latest or highest nonce) entry for the user in the queue.
pub(crate) fn update_user_state_on_entry_removal(&mut self, removed_entry: &BatchQueueEntry) {
let addr = removed_entry.sender;

let last_max_fee_limit = match self
Comment thread
MauroToscano marked this conversation as resolved.
Outdated
.batch_queue
.iter()
.filter(|(e, _)| e.sender == addr)
.next_back()
{
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
None => {
self.user_states.remove(&addr);
return;
}
};

if let Entry::Occupied(mut user_state) = self.user_states.entry(addr) {
user_state.get_mut().proofs_in_batch -= 1;
user_state.get_mut().nonce -= U256::one();
user_state.get_mut().total_fees_in_queue -= U256::one();
Comment thread
MauroToscano marked this conversation as resolved.
Outdated
user_state.get_mut().last_max_fee_limit = last_max_fee_limit;
}
}

pub(crate) fn is_queue_full(&self) -> bool {
self.batch_queue.len() >= self.max_size
Comment thread
MauroToscano marked this conversation as resolved.
}

pub(crate) fn lowest_entry_priority(&self) -> Option<BatchQueueEntryPriority> {
self.batch_queue
.peek()
.map(|(_, priority_entry)| priority_entry.clone())
}
}
4 changes: 4 additions & 0 deletions batcher/aligned-sdk/src/communication/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ async fn handle_batcher_response(msg: Message) -> Result<BatchInclusionData, Sub
);
Err(SubmitError::GenericError(e))
}
Ok(SubmitProofResponseMessage::BatchQueueLimitExceededError) => {
error!("Batcher responded with error: queue limit has been exceeded. Funds have not been spent.");
Err(SubmitError::BatchQueueLimitExceededError)
}
Err(e) => {
error!(
"Error while deserializing batch inclusion data: {}. Funds have not been spent.",
Expand Down
5 changes: 5 additions & 0 deletions batcher/aligned-sdk/src/core/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub enum SubmitError {
AddToBatchError,
InvalidProofInclusionData,
GetNonceError(String),
BatchQueueLimitExceededError,
GenericError(String),
}

Expand Down Expand Up @@ -210,6 +211,10 @@ impl fmt::Display for SubmitError {
SubmitError::InvalidProofInclusionData => {
write!(f, "Batcher responded with invalid batch inclusion data. Can't verify your proof was correctly included in the batch.")
}
SubmitError::BatchQueueLimitExceededError => {
write!(f, "Error while adding entry to batch, queue limit exeeded.")
}

SubmitError::GetNonceError(e) => write!(f, "Error while getting nonce {}", e),
}
}
Expand Down
1 change: 1 addition & 0 deletions batcher/aligned-sdk/src/core/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ pub enum SubmitProofResponseMessage {
AddToBatchError,
EthRpcError,
InvalidPaymentServiceAddress(Address, Address),
BatchQueueLimitExceededError,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
3 changes: 2 additions & 1 deletion config-files/config-batcher-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ batcher:
block_interval: 3
batch_size_interval: 10
transaction_wait_timeout: 96000 # 8 blocks
max_proof_size: 67108864 # 64 MiB
max_proof_size: 4194304 # 4 MiB
max_batch_byte_size: 268435456 # 256 MiB
max_batch_proof_qty: 3000 # 3000 proofs in a batch
max_queue_size: 10000
pre_verification_is_enabled: true
metrics_port: 9093
telemetry_ip_port_address: localhost:4001
Expand Down
3 changes: 2 additions & 1 deletion config-files/config-batcher-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ batcher:
block_interval: 3
batch_size_interval: 10
transaction_wait_timeout: 36000 # 3 blocks
max_proof_size: 67108864 # 64 MiB
max_proof_size: 4194304 # 4 MiB
max_batch_byte_size: 268435456 # 256 MiB
max_batch_proof_qty: 3000 # 3000 proofs in a batch
max_queue_size: 10000
pre_verification_is_enabled: true
metrics_port: 9093
telemetry_ip_port_address: localhost:4001
Expand Down
3 changes: 2 additions & 1 deletion config-files/config-batcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ batcher:
block_interval: 3
batch_size_interval: 10
transaction_wait_timeout: 96000 # 8 blocks
max_proof_size: 67108864 # 64 MiB
max_proof_size: 4194304 # 4 MiB
max_batch_byte_size: 268435456 # 256 MiB
max_batch_proof_qty: 3000 # 3000 proofs in a batch
max_queue_size: 10000
pre_verification_is_enabled: true
metrics_port: 9093
telemetry_ip_port_address: localhost:4001
Expand Down
Loading