@@ -1579,13 +1579,26 @@ impl Batcher {
15791579 // Close old sink in old entry and replace it with the new one
15801580 {
15811581 if let Some ( messaging_sink) = replacement_entry. messaging_sink {
1582- let mut old_sink = messaging_sink. write ( ) . await ;
1583- if let Err ( e) = old_sink. close ( ) . await {
1584- // we dont want to exit here, just log the error
1585- warn ! ( "Error closing sink: {e:?}" ) ;
1586- } else {
1587- info ! ( "Old websocket sink closed" ) ;
1588- }
1582+ tokio:: spawn ( async move {
1583+ // Before closing the old sink, send a message to the client notifying that their proof
1584+ // has been replaced
1585+ send_message (
1586+ messaging_sink. clone ( ) ,
1587+ SubmitProofResponseMessage :: ProofReplaced ,
1588+ )
1589+ . await ;
1590+
1591+ // Note: This shuts down the sink, but does not wait for it to close, so the other side
1592+ // might not receive the message. However, we don't want to wait here since it would
1593+ // block the batcher.
1594+ let mut old_sink = messaging_sink. write ( ) . await ;
1595+ if let Err ( e) = old_sink. close ( ) . await {
1596+ // we dont want to exit here, just log the error
1597+ warn ! ( "Error closing sink: {e:?}" ) ;
1598+ } else {
1599+ info ! ( "Old websocket sink closed" ) ;
1600+ }
1601+ } ) ;
15891602 } else {
15901603 warn ! (
15911604 "Old websocket sink was empty. This should only happen in testing environments"
@@ -2123,6 +2136,24 @@ impl Batcher {
21232136 warn ! ( "User {:?} has insufficient balance, flushing entire queue as safety measure" , address) ;
21242137
21252138 self . flush_queue_and_clear_nonce_cache ( ) . await ;
2139+
2140+ for entry in finalized_batch {
2141+ if let Some ( ws_sink) = entry. messaging_sink . as_ref ( ) {
2142+ tokio:: spawn ( send_message (
2143+ ws_sink. clone ( ) ,
2144+ SubmitProofResponseMessage :: BatchReset ,
2145+ ) ) ;
2146+ } else {
2147+ warn ! (
2148+ "Websocket sink was found empty. This should only happen in tests"
2149+ ) ;
2150+ }
2151+ }
2152+
2153+ return Err ( BatcherError :: StateCorruptedAndFlushed ( format ! (
2154+ "Queue and user states flushed due to insufficient balance for user {:?}" ,
2155+ address
2156+ ) ) ) ;
21262157 }
21272158 _ => {
21282159 // Add more cases here if we want in the future
@@ -2160,7 +2191,10 @@ impl Batcher {
21602191 let mut batch_state_lock = self . batch_state . lock ( ) . await ;
21612192 for ( entry, _) in batch_state_lock. batch_queue . iter ( ) {
21622193 if let Some ( ws_sink) = entry. messaging_sink . as_ref ( ) {
2163- send_message ( ws_sink. clone ( ) , SubmitProofResponseMessage :: BatchReset ) . await ;
2194+ tokio:: spawn ( send_message (
2195+ ws_sink. clone ( ) ,
2196+ SubmitProofResponseMessage :: BatchReset ,
2197+ ) ) ;
21642198 } else {
21652199 warn ! ( "Websocket sink was found empty. This should only happen in tests" ) ;
21662200 }
@@ -2249,12 +2283,19 @@ impl Batcher {
22492283
22502284 // If batch finalization failed, restore the proofs to the queue
22512285 if let Err ( e) = batch_finalization_result {
2252- error ! (
2253- "Batch finalization failed, restoring proofs to queue: {:?}" ,
2254- e
2255- ) ;
2256- self . restore_proofs_after_batch_failure ( & finalized_batch)
2257- . await ;
2286+ error ! ( "Batch finalization failed: {:?}" , e) ;
2287+
2288+ // If the queue was flushed, don't recover
2289+ match & e {
2290+ BatcherError :: StateCorruptedAndFlushed ( _) => {
2291+ info ! ( "State was corrupted and flushed - not restoring proofs" ) ;
2292+ }
2293+ _ => {
2294+ info ! ( "Restoring proofs to queue after batch failure" ) ;
2295+ self . restore_proofs_after_batch_failure ( & finalized_batch)
2296+ . await ;
2297+ }
2298+ }
22582299 return Err ( e) ;
22592300 }
22602301 }
0 commit comments