@@ -5,32 +5,33 @@ use std::net::SocketAddr;
55use std:: sync:: Arc ;
66use std:: time:: Duration ;
77
8- use crate :: eth:: BatchVerifiedEventStream ;
9- use aligned_batcher_lib:: types:: {
10- BatchInclusionData , ClientMessage , VerificationCommitmentBatch , VerificationData ,
11- VerificationDataCommitment ,
12- } ;
138use aws_sdk_s3:: client:: Client as S3Client ;
14- use eth:: { BatchVerifiedFilter , BatcherPaymentService } ;
159use ethers:: prelude:: { Middleware , Provider } ;
1610use ethers:: providers:: Ws ;
1711use ethers:: types:: { Address , U256 } ;
1812use futures_util:: stream:: { self , SplitSink } ;
1913use futures_util:: { future, SinkExt , StreamExt , TryStreamExt } ;
2014use lambdaworks_crypto:: merkle_tree:: merkle:: MerkleTree ;
21- use log:: { debug, error, info} ;
15+ use log:: { debug, error, info, warn } ;
2216use tokio:: net:: { TcpListener , TcpStream } ;
2317use tokio:: sync:: { Mutex , RwLock } ;
2418use tokio:: time:: timeout;
2519use tokio_tungstenite:: tungstenite:: error:: ProtocolError ;
2620use tokio_tungstenite:: tungstenite:: protocol:: { frame:: coding:: CloseCode , CloseFrame } ;
2721use tokio_tungstenite:: tungstenite:: { Error , Message } ;
2822use tokio_tungstenite:: WebSocketStream ;
23+
24+ use aligned_batcher_lib:: types:: {
25+ BatchInclusionData , ClientMessage , VerificationCommitmentBatch , VerificationData ,
26+ VerificationDataCommitment ,
27+ } ;
28+ use eth:: { BatchVerifiedFilter , BatcherPaymentService } ;
2929use types:: batch_queue:: BatchQueue ;
3030use types:: errors:: BatcherError ;
3131
32- use crate :: config:: { ConfigFromYaml , ContractDeploymentOutput } ;
32+ use crate :: config:: { ConfigFromYaml , ContractDeploymentOutput , NonPayingConfig } ;
3333use crate :: eth:: AlignedLayerServiceManager ;
34+ use crate :: eth:: BatchVerifiedEventStream ;
3435
3536mod config;
3637mod eth;
@@ -59,6 +60,7 @@ pub struct Batcher {
5960 last_uploaded_batch_block : Mutex < u64 > ,
6061 pre_verification_is_enabled : bool ,
6162 protocol_version : u16 ,
63+ non_paying_config : Option < NonPayingConfig > ,
6264}
6365
6466impl Batcher {
@@ -102,6 +104,11 @@ impl Batcher {
102104 . await
103105 . expect ( "Failed to get Batcher Payment Service contract" ) ;
104106
107+ if let Some ( non_paying_config) = & config. batcher . non_paying {
108+ warn ! ( "Non-paying address configuration detected. Will replace non-paying address {} with configured address {}." ,
109+ non_paying_config. address, non_paying_config. replacement) ;
110+ }
111+
105112 Self {
106113 s3_client,
107114 eth_ws_provider,
@@ -115,6 +122,7 @@ impl Batcher {
115122 last_uploaded_batch_block : Mutex :: new ( last_uploaded_batch_block) ,
116123 pre_verification_is_enabled : config. batcher . pre_verification_is_enabled ,
117124 protocol_version : PROTOCOL_VERSION ,
125+ non_paying_config : config. batcher . non_paying ,
118126 }
119127 }
120128
@@ -197,11 +205,18 @@ impl Batcher {
197205 serde_json:: from_str ( message. to_text ( ) . expect ( "Message is not text" ) )
198206 . expect ( "Failed to deserialize task" ) ;
199207
200- // FIXME: We are not doing anything for the moment with the address from the
201- // sender, this logic should be added for the payment system.
202208 info ! ( "Verifying message signature..." ) ;
203209 let submitter_addr = if let Ok ( addr) = client_msg. verify_signature ( ) {
204210 info ! ( "Message signature verified" ) ;
211+
212+ let mut addr = addr;
213+ if let Some ( non_paying_config) = & self . non_paying_config {
214+ if addr == non_paying_config. address {
215+ info ! ( "Non-paying address detected. Replacing with configured address" ) ;
216+ addr = non_paying_config. replacement ;
217+ }
218+ }
219+
205220 let user_balance = self
206221 . payment_service
207222 . user_balances ( addr)
@@ -215,6 +230,7 @@ impl Batcher {
215230 ProtocolError :: HandshakeIncomplete ,
216231 ) ) ;
217232 }
233+
218234 addr
219235 } else {
220236 error ! ( "Signature verification error" ) ;
@@ -356,10 +372,9 @@ impl Batcher {
356372 let batch_merkle_tree: MerkleTree < VerificationCommitmentBatch > =
357373 MerkleTree :: build ( & batch_data_comm) ;
358374
359- let submitter_addresses: Vec < Address > = finalized_batch
360- . clone ( )
361- . into_iter ( )
362- . map ( |( _, _, _, addr) | addr)
375+ let submitter_addresses = finalized_batch
376+ . iter ( )
377+ . map ( |( _, _, _, addr) | * addr)
363378 . collect ( ) ;
364379
365380 let events = self . service_manager . event :: < BatchVerifiedFilter > ( ) ;
@@ -386,7 +401,10 @@ impl Batcher {
386401 // connected clients
387402 let await_batch_verified_fut =
388403 await_batch_verified_event ( & mut stream, & batch_merkle_tree. root ) ;
389- if ( timeout ( Duration :: from_secs ( 60 ) , await_batch_verified_fut) . await ) . is_err ( ) {
404+ if timeout ( Duration :: from_secs ( 60 ) , await_batch_verified_fut)
405+ . await
406+ . is_err ( )
407+ {
390408 send_timeout_close ( finalized_batch) . await ?;
391409 } else {
392410 send_batch_inclusion_data_responses ( finalized_batch, & batch_merkle_tree) . await ;
0 commit comments