@@ -11,14 +11,20 @@ use alloy::{
1111} ;
1212use sqlx:: types:: BigDecimal ;
1313
14+ #[ derive( Debug , Clone ) ]
15+ pub enum PaymentsPollerError {
16+ ReadLastBlockError ( String ) ,
17+ }
18+
1419pub struct PaymentsPoller {
1520 db : Db ,
1621 proof_aggregation_service : AggregationModePaymentServiceContract ,
1722 rpc_provider : RpcProvider ,
23+ config : Config ,
1824}
1925
2026impl PaymentsPoller {
21- pub fn new ( db : Db , config : Config ) -> Self {
27+ pub fn new ( db : Db , config : Config ) -> Result < Self , PaymentsPollerError > {
2228 let rpc_url = config. eth_rpc_url . parse ( ) . expect ( "RPC URL should be valid" ) ;
2329 let rpc_provider = ProviderBuilder :: new ( ) . connect_http ( rpc_url) ;
2430 let proof_aggregation_service = AggregationModePaymentService :: new (
@@ -27,16 +33,32 @@ impl PaymentsPoller {
2733 rpc_provider. clone ( ) ,
2834 ) ;
2935
30- Self {
36+ // This check is here to catch early failures on last block fetching
37+ let _ = config
38+ . get_last_block_fetched ( )
39+ . map_err ( |err| PaymentsPollerError :: ReadLastBlockError ( err. to_string ( ) ) ) ;
40+
41+ Ok ( Self {
3142 db,
3243 proof_aggregation_service,
3344 rpc_provider,
34- }
45+ config,
46+ } )
3547 }
3648
3749 pub async fn start ( & self ) {
3850 let seconds_to_wait_between_polls = 12 ;
51+
3952 loop {
53+ let Ok ( last_block_fetched) = self . config . get_last_block_fetched ( ) else {
54+ tracing:: warn!( "Could not get last block fetched, skipping polling iteration..." ) ;
55+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
56+ seconds_to_wait_between_polls,
57+ ) )
58+ . await ;
59+ continue ;
60+ } ;
61+
4062 let Ok ( current_block) = self . rpc_provider . get_block_number ( ) . await else {
4163 tracing:: warn!( "Could not get current block skipping polling iteration..." ) ;
4264 tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
@@ -46,10 +68,13 @@ impl PaymentsPoller {
4668 continue ;
4769 } ;
4870
71+ let start_block = last_block_fetched. saturating_sub ( 5 ) ;
72+ tracing:: info!( "Fetching logs from block {start_block} to {current_block}" ) ;
73+
4974 let Ok ( logs) = self
5075 . proof_aggregation_service
5176 . UserPayment_filter ( )
52- . from_block ( current_block - 5 )
77+ . from_block ( start_block )
5378 . to_block ( current_block)
5479 . query ( )
5580 . await
@@ -91,6 +116,11 @@ impl PaymentsPoller {
91116 }
92117 }
93118
119+ if let Err ( err) = self . config . update_last_block_fetched ( current_block) {
120+ tracing:: error!( "Failed to update the last aggregated block: {err}" ) ;
121+ continue ;
122+ } ;
123+
94124 tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
95125 seconds_to_wait_between_polls,
96126 ) )
0 commit comments