Skip to content

Commit c02e861

Browse files
committed
Use DoublePriorityQueue
1 parent b037de8 commit c02e861

3 files changed

Lines changed: 20 additions & 61 deletions

File tree

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,8 @@ batcher_send_sp1_task:
393393
--vm_program ../../scripts/test_files/sp1/sp1_fibonacci_4_1_3.elf \
394394
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
395395
--rpc_url $(RPC_URL) \
396+
--max_fee $(MAX_FEE) \
397+
--private_key 0x224b7eb7449992aac96d631d9677f7bf5888245eef6d6eeda31e62d2f29a83e4 \
396398
--network $(NETWORK)
397399

398400
batcher_send_sp1_burst:
@@ -418,6 +420,7 @@ batcher_send_risc0_task:
418420
--vm_program ../../scripts/test_files/risc_zero/fibonacci_proof_generator/fibonacci_id_2_0.bin \
419421
--public_input ../../scripts/test_files/risc_zero/fibonacci_proof_generator/risc_zero_fibonacci_2_0.pub \
420422
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
423+
--private_key 0xc7f9f3b5c5a35632e8a8720e84b39d3987bfb01dbfd305bf359d9bb891ec5596 \
421424
--rpc_url $(RPC_URL) \
422425
--network $(NETWORK)
423426

batcher/aligned-batcher/src/lib.rs

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::time::{timeout, Instant};
1616
use types::batch_state::BatchState;
1717
use types::user_state::UserState;
1818

19-
use batch_queue::{calculate_batch_size, get_lowest_priority_entry};
19+
use batch_queue::calculate_batch_size;
2020
use std::collections::HashMap;
2121
use std::env;
2222
use std::net::SocketAddr;
@@ -1050,6 +1050,8 @@ impl Batcher {
10501050
let max_fee = verification_data.max_fee;
10511051
let nonce = verification_data.nonce;
10521052

1053+
info!("ME llega max_fee {}", max_fee);
1054+
10531055
let batch_queue_len = batch_state_lock.batch_queue.len();
10541056

10551057
let new_entry = BatchQueueEntry::new(
@@ -1061,44 +1063,19 @@ impl Batcher {
10611063
);
10621064
let new_entry_priority = BatchQueueEntryPriority::new(max_fee, nonce);
10631065

1064-
if batch_queue_len + 1 > self.max_batch_proof_qty {
1065-
info!("Batch queue is full. Trying to remove an entry...");
1066-
let (lowest_priority_entry, lowest_priority_entry_priority) =
1067-
get_lowest_priority_entry(&batch_state_lock.batch_queue).unwrap();
1068-
1069-
if lowest_priority_entry_priority < new_entry_priority {
1070-
if batch_state_lock
1071-
.batch_queue
1072-
.remove(&lowest_priority_entry)
1073-
.is_none()
1074-
{
1075-
// If this happens, we have a bug in our code
1076-
error!("Some proofs were not found in the queue. This should not happen.");
1077-
std::mem::drop(batch_state_lock);
1078-
return Err(BatcherError::QueueRemoveError(
1079-
"Some proofs were not found in the queue".into(),
1080-
));
1081-
}
1082-
info!("Removed entry from batch queue.");
1066+
batch_state_lock
1067+
.batch_queue
1068+
.push(new_entry, new_entry_priority);
10831069

1070+
// if max batch qty exceded, remove least priority proof
1071+
if batch_queue_len > self.max_batch_proof_qty {
1072+
if let Some(lowest_priority_entry) = batch_state_lock.batch_queue.pop_min() {
10841073
send_message(
1085-
lowest_priority_entry.messaging_sink.unwrap(),
1074+
lowest_priority_entry.0.messaging_sink.unwrap(),
10861075
SubmitProofResponseMessage::AddToBatchError,
10871076
)
10881077
.await;
1089-
1090-
batch_state_lock
1091-
.batch_queue
1092-
.push(new_entry, new_entry_priority);
1093-
} else {
1094-
error!("Queue is full and fee isn't enough");
1095-
std::mem::drop(batch_state_lock);
1096-
return Err(BatcherError::BatchQueueIsFull);
10971078
}
1098-
} else {
1099-
batch_state_lock
1100-
.batch_queue
1101-
.push(new_entry, new_entry_priority);
11021079
}
11031080

11041081
// Update metrics

batcher/aligned-batcher/src/types/batch_queue.rs

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use aligned_sdk::{
66
},
77
};
88
use ethers::types::{Address, Signature, U256};
9-
use priority_queue::PriorityQueue;
9+
use priority_queue::DoublePriorityQueue;
1010
use std::{
1111
hash::{Hash, Hasher},
1212
ops::ControlFlow,
@@ -26,8 +26,8 @@ pub(crate) struct BatchQueueEntry {
2626

2727
#[derive(Clone)]
2828
pub(crate) struct BatchQueueEntryPriority {
29-
max_fee: U256,
30-
nonce: U256,
29+
pub max_fee: U256,
30+
pub nonce: U256,
3131
}
3232

3333
impl BatchQueueEntry {
@@ -119,7 +119,7 @@ impl Ord for BatchQueueEntryPriority {
119119
}
120120
}
121121

122-
pub(crate) type BatchQueue = PriorityQueue<BatchQueueEntry, BatchQueueEntryPriority>;
122+
pub(crate) type BatchQueue = DoublePriorityQueue<BatchQueueEntry, BatchQueueEntryPriority>;
123123

124124
/// Calculates the size of the batch represented by the given batch queue.
125125
pub(crate) fn calculate_batch_size(batch_queue: &BatchQueue) -> Result<usize, BatcherError> {
@@ -165,7 +165,7 @@ pub(crate) fn try_build_batch(
165165
let mut finalized_batch = batch_queue;
166166
let mut batch_size = calculate_batch_size(&finalized_batch)?;
167167

168-
while let Some((entry, _)) = finalized_batch.peek() {
168+
while let Some((entry, _)) = finalized_batch.peek_max() {
169169
let batch_len = finalized_batch.len();
170170
let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price, constant_gas_cost);
171171

@@ -186,7 +186,7 @@ pub(crate) fn try_build_batch(
186186
.len();
187187
batch_size -= verification_data_size;
188188

189-
finalized_batch.pop();
189+
finalized_batch.pop_max();
190190

191191
continue;
192192
}
@@ -201,7 +201,7 @@ pub(crate) fn try_build_batch(
201201
return Err(BatcherError::BatchCostTooHigh);
202202
}
203203

204-
Ok(finalized_batch.clone().into_sorted_vec())
204+
Ok(finalized_batch.clone().into_descending_sorted_vec())
205205
}
206206

207207
fn calculate_fee_per_proof(batch_len: usize, gas_price: U256, constant_gas_cost: u128) -> U256 {
@@ -212,27 +212,6 @@ fn calculate_fee_per_proof(batch_len: usize, gas_price: U256, constant_gas_cost:
212212
U256::from(gas_per_proof) * gas_price
213213
}
214214

215-
pub(crate) fn get_lowest_priority_entry(
216-
batch_queue: &BatchQueue,
217-
) -> Option<(BatchQueueEntry, BatchQueueEntryPriority)> {
218-
let mut lowest_fee_entry: Option<(BatchQueueEntry, BatchQueueEntryPriority)> = None;
219-
220-
for (entry, priority) in batch_queue {
221-
match &lowest_fee_entry {
222-
Some((e, p)) => {
223-
if *priority < *p {
224-
lowest_fee_entry = Some((e.clone(), p.clone()));
225-
}
226-
}
227-
None => {
228-
lowest_fee_entry = Some((entry.clone(), priority.clone()));
229-
}
230-
}
231-
}
232-
233-
lowest_fee_entry
234-
}
235-
236215
#[cfg(test)]
237216
mod test {
238217
use aligned_sdk::core::constants::DEFAULT_CONSTANT_GAS_COST;

0 commit comments

Comments
 (0)