Skip to content

Commit 4a9cfdc

Browse files
NicolasRampoldientropidelictaturosati
authored
feat: add to sdk an option to wait for the verification of the batcher in ethereum make it the default (#583)
Co-authored-by: Mariano A. Nicolini <mariano.nicolini.91@gmail.com> Co-authored-by: Tatu <65305492+srosati@users.noreply.github.com>
1 parent aa0d3e6 commit 4a9cfdc

21 files changed

Lines changed: 771 additions & 504 deletions

File tree

batcher/aligned-batcher/src/config/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ impl ConfigFromYaml {
4242

4343
#[derive(Debug, Deserialize)]
4444
pub struct Addresses {
45-
#[serde(rename = "alignedLayerServiceManager")]
46-
pub aligned_layer_service_manager: String,
4745
#[serde(rename = "batcherPaymentService")]
4846
pub batcher_payment_service: String,
4947
}

batcher/aligned-batcher/src/eth/mod.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33

44
use ethers::prelude::k256::ecdsa::SigningKey;
55
use ethers::prelude::*;
6-
use stream::EventStream;
76

87
use crate::config::ECDSAConfig;
98

@@ -22,45 +21,13 @@ pub struct BatchVerified {
2221
pub batch_merkle_root: [u8; 32],
2322
}
2423

25-
pub type AlignedLayerServiceManager =
26-
AlignedLayerServiceManagerContract<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;
27-
28-
pub type BatchVerifiedEventStream<'s> = EventStream<
29-
's,
30-
FilterWatcher<'s, Http, Log>,
31-
BatchVerifiedFilter,
32-
ContractError<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>,
33-
>;
34-
3524
pub type BatcherPaymentService =
3625
BatcherPaymentServiceContract<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;
3726

3827
pub fn get_provider(eth_rpc_url: String) -> Result<Provider<Http>, anyhow::Error> {
3928
Provider::<Http>::try_from(eth_rpc_url).map_err(|err| anyhow::anyhow!(err))
4029
}
4130

42-
pub async fn get_service_manager(
43-
provider: Provider<Http>,
44-
ecdsa_config: ECDSAConfig,
45-
contract_address: String,
46-
) -> Result<AlignedLayerServiceManager, anyhow::Error> {
47-
let chain_id = provider.get_chainid().await?;
48-
49-
// get private key from keystore
50-
let wallet = Wallet::decrypt_keystore(
51-
&ecdsa_config.private_key_store_path,
52-
&ecdsa_config.private_key_store_password,
53-
)?
54-
.with_chain_id(chain_id.as_u64());
55-
56-
let signer = Arc::new(SignerMiddleware::new(provider, wallet));
57-
58-
let service_manager =
59-
AlignedLayerServiceManager::new(H160::from_str(contract_address.as_str())?, signer);
60-
61-
Ok(service_manager)
62-
}
63-
6431
pub async fn create_new_task(
6532
payment_service: &BatcherPaymentService,
6633
batch_merkle_root: [u8; 32],

batcher/aligned-batcher/src/gnark/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use aligned_sdk::types::ProvingSystemId;
1+
use aligned_sdk::core::types::ProvingSystemId;
22

33
#[derive(Copy, Clone, Debug)]
44
#[repr(C)]

batcher/aligned-batcher/src/lib.rs

Lines changed: 16 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,16 @@ extern crate core;
22

33
use dotenv::dotenv;
44

5-
use std::borrow::Cow;
65
use std::env;
76
use std::net::SocketAddr;
87
use std::sync::Arc;
9-
use std::time::Duration;
108

11-
use crate::eth::BatchVerifiedEventStream;
12-
use aligned_sdk::types::{
13-
BatchInclusionData, ClientMessage, VerificationCommitmentBatch, VerificationData,
14-
VerificationDataCommitment,
9+
use aligned_sdk::core::types::{
10+
BatchInclusionData, ClientMessage, ResponseMessage, VerificationCommitmentBatch,
11+
VerificationData, VerificationDataCommitment,
1512
};
1613
use aws_sdk_s3::client::Client as S3Client;
17-
use eth::{BatchVerifiedFilter, BatcherPaymentService};
14+
use eth::BatcherPaymentService;
1815
use ethers::prelude::{Middleware, Provider};
1916
use ethers::providers::Ws;
2017
use ethers::types::{Address, U256};
@@ -24,16 +21,13 @@ use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
2421
use log::{debug, error, info, warn};
2522
use tokio::net::{TcpListener, TcpStream};
2623
use tokio::sync::{Mutex, RwLock};
27-
use tokio::time::timeout;
2824
use tokio_tungstenite::tungstenite::error::ProtocolError;
29-
use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame};
3025
use tokio_tungstenite::tungstenite::{Error, Message};
3126
use tokio_tungstenite::WebSocketStream;
3227
use types::batch_queue::BatchQueue;
3328
use types::errors::BatcherError;
3429

3530
use crate::config::{ConfigFromYaml, ContractDeploymentOutput, NonPayingConfig};
36-
use crate::eth::AlignedLayerServiceManager;
3731

3832
mod config;
3933
mod eth;
@@ -49,7 +43,6 @@ pub struct Batcher {
4943
s3_client: S3Client,
5044
s3_bucket_name: String,
5145
eth_ws_provider: Provider<Ws>,
52-
service_manager: AlignedLayerServiceManager,
5346
payment_service: BatcherPaymentService,
5447
batch_queue: Mutex<BatchQueue>,
5548
max_block_interval: u64,
@@ -90,14 +83,6 @@ impl Batcher {
9083
.try_into()
9184
.unwrap();
9285

93-
let service_manager = eth::get_service_manager(
94-
eth_rpc_provider.clone(),
95-
config.ecdsa.clone(),
96-
deployment_output.addresses.aligned_layer_service_manager,
97-
)
98-
.await
99-
.expect("Failed to get Aligned service manager contract");
100-
10186
let payment_service = eth::get_batcher_payment_service(
10287
eth_rpc_provider,
10388
config.ecdsa,
@@ -115,7 +100,6 @@ impl Batcher {
115100
s3_client,
116101
s3_bucket_name,
117102
eth_ws_provider,
118-
service_manager,
119103
payment_service,
120104
batch_queue: Mutex::new(BatchQueue::new()),
121105
max_block_interval: config.batcher.block_interval,
@@ -173,19 +157,19 @@ impl Batcher {
173157
let (outgoing, incoming) = ws_stream.split();
174158
let outgoing = Arc::new(RwLock::new(outgoing));
175159

176-
// Send the protocol version to the client
177-
let protocol_version_msg = Message::binary(
178-
aligned_sdk::sdk::CURRENT_PROTOCOL_VERSION
179-
.to_be_bytes()
180-
.to_vec(),
160+
let protocol_version_msg = ResponseMessage::ProtocolVersion(
161+
aligned_sdk::communication::protocol::EXPECTED_PROTOCOL_VERSION,
181162
);
182163

164+
let serialized_protocol_version_msg = serde_json::to_vec(&protocol_version_msg)
165+
.expect("Could not serialize protocol version message");
166+
183167
outgoing
184168
.write()
185169
.await
186-
.send(protocol_version_msg)
170+
.send(Message::binary(serialized_protocol_version_msg))
187171
.await
188-
.expect("Failed to send protocol version");
172+
.expect("Could not send protocol version message");
189173

190174
match incoming
191175
.try_filter(|msg| future::ready(msg.is_text()))
@@ -357,7 +341,6 @@ impl Batcher {
357341
&self,
358342
block_number: u64,
359343
finalized_batch: BatchQueue,
360-
wait_for_verification: bool,
361344
) -> Result<(), BatcherError> {
362345
let batch_verification_data: Vec<VerificationData> = finalized_batch
363346
.clone()
@@ -383,12 +366,6 @@ impl Batcher {
383366
.map(|(_, _, _, addr)| *addr)
384367
.collect();
385368

386-
let events = self.service_manager.event::<BatchVerifiedFilter>();
387-
let mut stream = events
388-
.stream()
389-
.await
390-
.map_err(|e| BatcherError::BatchVerifiedEventStreamError(e.to_string()))?;
391-
392369
{
393370
let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await;
394371
// update last uploaded batch block
@@ -402,24 +379,7 @@ impl Batcher {
402379
self.submit_batch(&batch_bytes, &batch_merkle_tree.root, submitter_addresses)
403380
.await;
404381

405-
if !wait_for_verification {
406-
send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await;
407-
return Ok(());
408-
}
409-
410-
// This future is created to be passed to the timeout function, so that if it is not resolved
411-
// within the timeout interval an error is raised. If the event is received, responses are sent to
412-
// connected clients
413-
let await_batch_verified_fut =
414-
await_batch_verified_event(&mut stream, &batch_merkle_tree.root);
415-
if timeout(Duration::from_secs(60), await_batch_verified_fut)
416-
.await
417-
.is_err()
418-
{
419-
send_timeout_close(finalized_batch).await?;
420-
} else {
421-
send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await;
422-
}
382+
send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await;
423383

424384
Ok(())
425385
}
@@ -428,8 +388,7 @@ impl Batcher {
428388
/// finalizes the batch.
429389
async fn handle_new_block(&self, block_number: u64) -> Result<(), BatcherError> {
430390
while let Some(finalized_batch) = self.is_batch_ready(block_number).await {
431-
self.finalize_batch(block_number, finalized_batch, false)
432-
.await?;
391+
self.finalize_batch(block_number, finalized_batch).await?;
433392
}
434393
Ok(())
435394
}
@@ -489,26 +448,6 @@ impl Batcher {
489448
}
490449
}
491450
}
492-
/// Await for the `BatchVerified` event emitted by the Aligned contract and then send responses.
493-
async fn await_batch_verified_event<'s>(
494-
events_stream: &mut BatchVerifiedEventStream<'s>,
495-
batch_merkle_root: &[u8; 32],
496-
) -> Result<(), BatcherError> {
497-
while let Some(event_result) = events_stream.next().await {
498-
if let Ok(event) = event_result {
499-
if &event.batch_merkle_root == batch_merkle_root {
500-
info!("Batch operator signatures verified on Ethereum. Sending response to clients...");
501-
break;
502-
}
503-
} else {
504-
error!("Error awaiting for batch signature verification event");
505-
return Err(BatcherError::BatchVerifiedEventStreamError(
506-
event_result.unwrap_err().to_string(),
507-
));
508-
}
509-
}
510-
Ok(())
511-
}
512451

513452
async fn send_batch_inclusion_data_responses(
514453
finalized_batch: BatchQueue,
@@ -517,7 +456,9 @@ async fn send_batch_inclusion_data_responses(
517456
stream::iter(finalized_batch.iter())
518457
.enumerate()
519458
.for_each(|(vd_batch_idx, (_, _, ws_sink, _))| async move {
520-
let response = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
459+
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
460+
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);
461+
521462
let serialized_response =
522463
serde_json::to_vec(&response).expect("Could not serialize response");
523464

@@ -537,30 +478,3 @@ async fn send_batch_inclusion_data_responses(
537478
})
538479
.await;
539480
}
540-
541-
/// Send a close response to all clients that included data in the batch indicated that a
542-
/// timeout was exceeded awaiting for the batch verification events
543-
async fn send_timeout_close(finalized_batch: BatchQueue) -> Result<(), BatcherError> {
544-
let timeout_msg = Message::Close(Some(CloseFrame {
545-
code: CloseCode::Protocol,
546-
reason: Cow::from("Timeout: BatchVerified event not received"),
547-
}));
548-
549-
for (_, _, ws_sink, _) in finalized_batch.iter() {
550-
let send_result = ws_sink.write().await.send(timeout_msg.clone()).await;
551-
match send_result {
552-
// When two or more proofs from the same client are included into a batch,
553-
// there will be more than one `ws_sink` corresponding to that client. When one is
554-
// closed, the other ones will raise this error. We can just ignore it.
555-
Err(Error::Protocol(ProtocolError::SendAfterClosing)) => (),
556-
Err(e) => {
557-
error!("Error sending timeout response to clients: {}", e);
558-
return Err(e.into());
559-
}
560-
Ok(_) => (),
561-
}
562-
563-
info!("Timeout close response sent");
564-
}
565-
Ok(())
566-
}

batcher/aligned-batcher/src/types/batch_queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures_util::stream::SplitSink;
55
use tokio::{net::TcpStream, sync::RwLock};
66
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
77

8-
use aligned_sdk::types::{VerificationData, VerificationDataCommitment};
8+
use aligned_sdk::core::types::{VerificationData, VerificationDataCommitment};
99

1010
pub(crate) type BatchQueueEntry = (
1111
VerificationData,

batcher/aligned-batcher/src/zk_utils/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
use log::{debug, warn};
2-
3-
use aligned_sdk::types::{ProvingSystemId, VerificationData};
4-
51
use crate::gnark::verify_gnark;
62
use crate::halo2::ipa::verify_halo2_ipa;
73
use crate::halo2::kzg::verify_halo2_kzg;
84
use crate::risc_zero::verify_risc_zero_proof;
95
use crate::sp1::verify_sp1_proof;
6+
use aligned_sdk::core::types::{ProvingSystemId, VerificationData};
7+
use log::{debug, warn};
108

119
pub(crate) fn verify(verification_data: &VerificationData) -> bool {
1210
match verification_data.proving_system {

0 commit comments

Comments
 (0)