@@ -4,15 +4,17 @@ use aligned_sdk::eth::batcher_payment_service::SignatureData;
44use config:: NonPayingConfig ;
55use dotenv:: dotenv;
66use ethers:: signers:: Signer ;
7+ use serde:: Serialize ;
78
9+ use std:: collections:: hash_map:: Entry ;
810use std:: collections:: HashMap ;
911use std:: env;
1012use std:: net:: SocketAddr ;
1113use std:: sync:: Arc ;
1214
1315use aligned_sdk:: core:: types:: {
1416 BatchInclusionData , ClientMessage , NoncedVerificationData , ResponseMessage ,
15- VerificationCommitmentBatch , VerificationDataCommitment ,
17+ ValidityResponseMessage , VerificationCommitmentBatch , VerificationDataCommitment ,
1618} ;
1719use aws_sdk_s3:: client:: Client as S3Client ;
1820use eth:: BatcherPaymentService ;
@@ -106,7 +108,7 @@ impl Batcher {
106108 let non_paying_config = if let Some ( non_paying_config) = config. batcher . non_paying {
107109 warn ! ( "Non-paying address configuration detected. Will replace non-paying address {} with configured address." ,
108110 non_paying_config. address) ;
109- Some ( NonPayingConfig :: from_yaml_config ( non_paying_config, & payment_service ) . await )
111+ Some ( NonPayingConfig :: from_yaml_config ( non_paying_config) . await )
110112 } else {
111113 None
112114 } ;
@@ -220,9 +222,9 @@ impl Batcher {
220222 . await ;
221223 } else {
222224 if !self . check_user_balance ( & addr) . await {
223- send_error_message (
225+ send_message (
224226 ws_conn_sink. clone ( ) ,
225- ResponseMessage :: InsufficientBalanceError ( addr) ,
227+ ValidityResponseMessage :: InsufficientBalance ( addr) ,
226228 )
227229 . await ;
228230
@@ -237,21 +239,15 @@ impl Batcher {
237239 && !zk_utils:: verify ( & nonced_verification_data. verification_data )
238240 {
239241 error ! ( "Invalid proof detected. Verification failed." ) ;
240- send_error_message (
241- ws_conn_sink. clone ( ) ,
242- ResponseMessage :: VerificationError ( ) ,
243- )
244- . await ;
242+ send_message ( ws_conn_sink. clone ( ) , ValidityResponseMessage :: InvalidProof )
243+ . await ;
245244 return Ok ( ( ) ) ; // Send error message to the client and return
246245 }
247246
248247 // Doing nonce verification after proof verification to avoid unnecessary nonce increment
249248 if !self . check_nonce_and_increment ( addr, nonce) . await {
250- send_error_message (
251- ws_conn_sink. clone ( ) ,
252- ResponseMessage :: InvalidNonceError ,
253- )
254- . await ;
249+ send_message ( ws_conn_sink. clone ( ) , ValidityResponseMessage :: InvalidNonce )
250+ . await ;
255251 return Ok ( ( ) ) ; // Send error message to the client and return
256252 }
257253
@@ -263,20 +259,21 @@ impl Batcher {
263259 . await ;
264260 } else {
265261 error ! ( "Proof is too large" ) ;
266- send_error_message ( ws_conn_sink. clone ( ) , ResponseMessage :: ProofTooLargeError ( ) )
262+ send_message ( ws_conn_sink. clone ( ) , ValidityResponseMessage :: ProofTooLarge )
267263 . await ;
268264 return Ok ( ( ) ) ; // Send error message to the client and return
269265 } ;
270266
271267 info ! ( "Verification data message handled" ) ;
272268
269+ send_message ( ws_conn_sink, ValidityResponseMessage :: Valid ) . await ;
273270 return Ok ( ( ) ) ;
274271 }
275272 } else {
276273 error ! ( "Signature verification error" ) ;
277- send_error_message (
274+ send_message (
278275 ws_conn_sink. clone ( ) ,
279- ResponseMessage :: SignatureVerificationError ( ) ,
276+ ValidityResponseMessage :: InvalidSignature ,
280277 )
281278 . await ;
282279 Ok ( ( ) ) // Send error message to the client and return
@@ -482,12 +479,15 @@ impl Batcher {
482479 {
483480 for ( _, _, ws_sink, _) in finalized_batch. iter ( ) {
484481 let merkle_root = hex:: encode ( batch_merkle_tree. root ) ;
485- send_error_message (
482+ send_message (
486483 ws_sink. clone ( ) ,
487484 ResponseMessage :: CreateNewTaskError ( merkle_root) ,
488485 )
489486 . await
490487 }
488+
489+ self . flush_queue_and_clear_nonce_cache ( ) . await ;
490+
491491 return Err ( e) ;
492492 } ;
493493
@@ -496,6 +496,22 @@ impl Batcher {
496496 Ok ( ( ) )
497497 }
498498
499+ async fn flush_queue_and_clear_nonce_cache ( & self ) {
500+ warn ! ( "Resetting state... Flushing queue and nonces" ) ;
501+
502+ let mut batch_queue = self . batch_queue . lock ( ) . await ;
503+ let mut user_nonces = self . user_nonces . lock ( ) . await ;
504+ let mut user_proof_count_in_batch = self . user_proof_count_in_batch . lock ( ) . await ;
505+
506+ for ( _, _, ws_sink, _) in batch_queue. iter ( ) {
507+ send_message ( ws_sink. clone ( ) , ResponseMessage :: BatchReset ) . await ;
508+ }
509+
510+ batch_queue. clear ( ) ;
511+ user_nonces. clear ( ) ;
512+ user_proof_count_in_batch. clear ( ) ;
513+ }
514+
499515 /// Receives new block numbers, checks if conditions are met for submission and
500516 /// finalizes the batch.
501517 async fn handle_new_block ( & self , block_number : u64 ) -> Result < ( ) , BatcherError > {
@@ -576,24 +592,8 @@ impl Batcher {
576592 client_msg : ClientMessage ,
577593 ) -> Result < ( ) , Error > {
578594 let non_paying_config = self . non_paying_config . as_ref ( ) . unwrap ( ) ;
579-
580- // The nonpaying nonce is locked through the entire message processing so that
581- // another incoming connections using the nonpaying address don't desync its nonce
582- let mut nonpaying_nonce = non_paying_config. nonce . lock ( ) . await ;
583595 let addr = non_paying_config. replacement . address ( ) ;
584596
585- let mut nonce_bytes = [ 0u8 ; 32 ] ;
586- nonpaying_nonce. to_big_endian ( & mut nonce_bytes) ;
587- * nonpaying_nonce += U256 :: one ( ) ;
588-
589- let verifcation_data = NoncedVerificationData :: new (
590- client_msg. verification_data . verification_data . clone ( ) ,
591- nonce_bytes,
592- ) ;
593-
594- let client_msg =
595- ClientMessage :: new ( verifcation_data, non_paying_config. replacement . clone ( ) ) ;
596-
597597 let user_balance = self
598598 . payment_service
599599 . user_balances ( addr)
@@ -603,32 +603,57 @@ impl Batcher {
603603
604604 if user_balance == U256 :: from ( 0 ) {
605605 error ! ( "Insufficient funds for address {:?}" , addr) ;
606- send_error_message (
606+ send_message (
607607 ws_conn_sink. clone ( ) ,
608- ResponseMessage :: InsufficientBalanceError ( addr) ,
608+ ValidityResponseMessage :: InsufficientBalance ( addr) ,
609609 )
610610 . await ;
611611 return Ok ( ( ) ) ; // Send error message to the client and return
612612 }
613613
614- let nonce = U256 :: from_big_endian ( client_msg. verification_data . nonce . as_slice ( ) ) ;
615- let nonced_verification_data = client_msg. verification_data ;
616- if nonced_verification_data. verification_data . proof . len ( ) <= self . max_proof_size {
614+ if client_msg. verification_data . verification_data . proof . len ( ) <= self . max_proof_size {
617615 // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
618616 if self . pre_verification_is_enabled
619- && !zk_utils:: verify ( & nonced_verification_data . verification_data )
617+ && !zk_utils:: verify ( & client_msg . verification_data . verification_data )
620618 {
621619 error ! ( "Invalid proof detected. Verification failed." ) ;
622- send_error_message ( ws_conn_sink. clone ( ) , ResponseMessage :: VerificationError ( ) )
623- . await ;
620+ send_message ( ws_conn_sink. clone ( ) , ValidityResponseMessage :: InvalidProof ) . await ;
624621 return Ok ( ( ) ) ; // Send error message to the client and return
625622 }
626623
627- // Doing nonce verification after proof verification to avoid unnecessary nonce increment
628- if !self . check_nonce_and_increment ( addr, nonce) . await {
629- send_error_message ( ws_conn_sink. clone ( ) , ResponseMessage :: InvalidNonceError ) . await ;
630- return Ok ( ( ) ) ; // Send error message to the client and return
631- }
624+ let nonced_verification_data = {
625+ let mut user_nonces = self . user_nonces . lock ( ) . await ;
626+
627+ let nonpaying_nonce = match user_nonces. entry ( addr) {
628+ Entry :: Occupied ( o) => o. into_mut ( ) ,
629+ Entry :: Vacant ( vacant) => {
630+ let nonce = self
631+ . payment_service
632+ . user_nonces ( addr)
633+ . call ( )
634+ . await
635+ . expect ( "Failed to get nonce" ) ;
636+
637+ vacant. insert ( nonce)
638+ }
639+ } ;
640+
641+ debug ! ( "non paying nonce: {:?}" , nonpaying_nonce) ;
642+
643+ let mut nonce_bytes = [ 0u8 ; 32 ] ;
644+ nonpaying_nonce. to_big_endian ( & mut nonce_bytes) ;
645+ * nonpaying_nonce += U256 :: one ( ) ;
646+
647+ NoncedVerificationData :: new (
648+ client_msg. verification_data . verification_data . clone ( ) ,
649+ nonce_bytes,
650+ )
651+ } ;
652+
653+ let client_msg = ClientMessage :: new (
654+ nonced_verification_data. clone ( ) ,
655+ non_paying_config. replacement . clone ( ) ,
656+ ) ;
632657
633658 self . clone ( )
634659 . add_to_batch (
@@ -639,12 +664,13 @@ impl Batcher {
639664 . await ;
640665 } else {
641666 error ! ( "Proof is too large" ) ;
642- send_error_message ( ws_conn_sink. clone ( ) , ResponseMessage :: ProofTooLargeError ( ) ) . await ;
667+ send_message ( ws_conn_sink. clone ( ) , ValidityResponseMessage :: ProofTooLarge ) . await ;
643668 return Ok ( ( ) ) ; // Send error message to the client and return
644669 } ;
645670
646671 info ! ( "Verification data message handled" ) ;
647672
673+ send_message ( ws_conn_sink, ValidityResponseMessage :: Valid ) . await ;
648674 Ok ( ( ) )
649675 }
650676
@@ -687,18 +713,17 @@ async fn send_batch_inclusion_data_responses(
687713 . await ;
688714}
689715
690- async fn send_error_message (
716+ async fn send_message < T : Serialize > (
691717 ws_conn_sink : Arc < RwLock < SplitSink < WebSocketStream < TcpStream > , Message > > > ,
692- error_message : ResponseMessage ,
718+ message : T ,
693719) {
694- let serialized_response =
695- serde_json:: to_vec ( & error_message) . expect ( "Could not serialize response" ) ;
720+ let serialized_response = serde_json:: to_vec ( & message) . expect ( "Could not serialize response" ) ;
696721
697722 // Send error message
698723 ws_conn_sink
699724 . write ( )
700725 . await
701726 . send ( Message :: binary ( serialized_response) )
702727 . await
703- . expect ( "Failed to send error message" ) ;
728+ . expect ( "Failed to send message" ) ;
704729}
0 commit comments