Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions aggregation_mode/src/backend/config.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use serde::Deserialize;
use std::{fs::File, io::Read};
use serde::{Deserialize, Serialize};
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ECDSAConfig {
pub private_key_store_path: String,
pub private_key_store_password: String,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct LastProcessedBlock {
pub last_processed_block: u64,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub eth_rpc_url: String,
pub eth_ws_url: String,
pub max_proofs_in_queue: u16,
pub proof_aggregation_service_address: String,
pub aligned_service_manager_address: String,
pub last_processed_block_filepath: String,
pub ecdsa: ECDSAConfig,
pub fetch_logs_from_secs_ago: u64,
pub block_time_secs: u64,
}

impl Config {
Expand All @@ -27,4 +31,39 @@ impl Config {
let config: Config = serde_yaml::from_str(&contents)?;
Ok(config)
}

pub fn get_last_processed_block(&self) -> Result<u64, Box<dyn std::error::Error>> {
match File::open(&self.last_processed_block_filepath) {
Err(_) => {
// if file doesn't exist, default 0
Ok(0)
}
Ok(mut file) => {
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let lpb: LastProcessedBlock = serde_json::from_str(&contents)?;
Ok(lpb.last_processed_block)
}
}
}

pub fn update_last_processed_block(
&self,
last_processed_block: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let last_processed_block_struct = LastProcessedBlock {
last_processed_block,
};

let mut file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&self.last_processed_block_filepath)?;

let content = serde_json::to_string(&last_processed_block_struct)?;
file.write_all(content.as_bytes())?;

Ok(())
}
}
45 changes: 26 additions & 19 deletions aggregation_mode/src/backend/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ pub enum ProofsFetcherError {
pub struct ProofsFetcher {
rpc_provider: RPCProvider,
aligned_service_manager: AlignedLayerServiceManagerContract,
fetch_from_secs_ago: u64,
block_time_secs: u64,
last_processed_block: u64,
}

impl ProofsFetcher {
Expand All @@ -38,31 +37,47 @@ impl ProofsFetcher {
rpc_provider.clone(),
);

let last_processed_block = config.get_last_processed_block().unwrap();

Self {
rpc_provider,
aligned_service_manager,
fetch_from_secs_ago: config.fetch_logs_from_secs_ago,
block_time_secs: config.block_time_secs,
last_processed_block,
}
}

pub async fn fetch(&self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
let from_block = self.get_block_number_to_fetch_from().await?;
pub async fn fetch(&mut self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
// Get current block
let current_block = self
.rpc_provider
.get_block_number()
.await
.map_err(|_| ProofsFetcherError::BlockNumber)?;

if current_block < self.last_processed_block {
return Err(ProofsFetcherError::BlockNumber);
}

info!(
"Fetching proofs from batch logs starting from block number {}",
from_block
"Fetching proofs from batch logs starting from block number {} upto {}",
self.last_processed_block, current_block
);

// Subscribe to NewBatch event from AlignedServiceManager
let logs = self
.aligned_service_manager
.NewBatchV3_filter()
.from_block(from_block)
.from_block(self.last_processed_block)
.to_block(current_block)
.query()
.await
.map_err(|_| ProofsFetcherError::QueryingLogs)?;

info!("Logs collected {}", logs.len());

// Update last processed block after collecting logs
self.last_processed_block = current_block;

let mut proofs = vec![];

for (batch, _) in logs {
Expand Down Expand Up @@ -119,15 +134,7 @@ impl ProofsFetcher {
Ok(proofs)
}

async fn get_block_number_to_fetch_from(&self) -> Result<u64, ProofsFetcherError> {
let block_number = self
.rpc_provider
.get_block_number()
.await
.map_err(|_| ProofsFetcherError::BlockNumber)?;

let number_of_blocks_in_the_past = self.fetch_from_secs_ago / self.block_time_secs;

Ok(block_number.saturating_sub(number_of_blocks_in_the_past))
pub fn get_last_processed_block(&self) -> u64 {
self.last_processed_block
}
}
5 changes: 4 additions & 1 deletion aggregation_mode/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ impl ProofAggregator {
}
}

pub async fn start(&mut self) {
pub async fn start(&mut self, config: &Config) {
info!("Starting proof aggregator service",);

info!("About to aggregate and submit proof to be verified on chain");
let res = self.aggregate_and_submit_proofs_on_chain().await;

match res {
Ok(()) => {
config
.update_last_processed_block(self.fetcher.get_last_processed_block())
.unwrap();
info!("Process finished successfully");
}
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion aggregation_mode/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ async fn main() {
tracing::info!("Config loaded");

let mut proof_aggregator = ProofAggregator::new(&config);
proof_aggregator.start().await;
proof_aggregator.start(&config).await;
}
18 changes: 7 additions & 11 deletions config-files/config-proof-aggregator-mock.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
aligned_service_manager_address: "0x851356ae760d987E095750cCeb3bC6014560891C"
proof_aggregation_service_address: "0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07"
eth_rpc_url: "http://localhost:8545"
eth_ws_url: "ws://localhost:8545"
eth_rpc_url: http://localhost:8545
eth_ws_url: ws://localhost:8545
max_proofs_in_queue: 1000
# How far in the past should the service go to fetch batch logs
fetch_logs_from_secs_ago: 86400 # 24hs
# Anvil start with block time is 7 seconds
block_time_secs: 7

proof_aggregation_service_address: 0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07
aligned_service_manager_address: 0x851356ae760d987E095750cCeb3bC6014560891C
last_processed_block_filepath: config-files/proof-aggregator.last_processed_block.json
ecdsa:
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"
private_key_store_password: ""
private_key_store_path: config-files/anvil.proof-aggregator.ecdsa.key.json
private_key_store_password: ''
5 changes: 1 addition & 4 deletions config-files/config-proof-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ proof_aggregation_service_address: "0xcbEAF3BDe82155F56486Fb5a1072cb8baAf547cc"
eth_rpc_url: "http://localhost:8545"
eth_ws_url: "ws://localhost:8545"
max_proofs_in_queue: 1000
# How far in the past should the service go to fetch batch logs
fetch_logs_from_secs_ago: 86400 # 24hs
# Anvil start with block time is 7 seconds
block_time_secs: 7
last_processed_block_filepath: config-files/proof-aggregator.last_processed_block.json
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem the other suggestion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


ecdsa:
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"
Expand Down
1 change: 1 addition & 0 deletions config-files/proof-aggregator.last_processed_block.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"last_processed_block":0}
Loading