Skip to content

Commit c38b398

Browse files
Merge branch 'staging' into feataggmode/add-grafana-dashboard
2 parents 2aa1550 + 6f2be89 commit c38b398

9 files changed

Lines changed: 85 additions & 20 deletions

File tree

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,11 @@ agg_mode_gateway_send_payment:
335335
0x922D6956C99E12DFeB3224DEA977D0939758A1Fe \
336336
--private-key 0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d
337337

338+
agg_mode_gateway_send_sp1_proof:
339+
@cargo run --manifest-path aggregation_mode/cli/Cargo.toml -- submit sp1 \
340+
--proof scripts/test_files/sp1/sp1_fibonacci_5_0_0.proof \
341+
--vk scripts/test_files/sp1/sp1_fibonacci_5_0_0_vk.bin \
342+
--private-key "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
338343

339344
agg_mode_install_cli: ## Install the aggregation mode CLI
340345
@cargo install --path aggregation_mode/cli

aggregation_mode/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
tokio = { version = "1"}
8-
# TODO: enable tls
9-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
8+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate", "chrono" ] }
109

1110

1211
[[bin]]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE tasks add COLUMN status_updated_at TIMESTAMPTZ DEFAULT now();

aggregation_mode/db/src/types.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use sqlx::{
22
prelude::FromRow,
3-
types::{BigDecimal, Uuid},
3+
types::{
4+
chrono::{DateTime, Utc},
5+
BigDecimal, Uuid,
6+
},
47
Type,
58
};
69

@@ -21,6 +24,7 @@ pub struct Task {
2124
pub program_commitment: Vec<u8>,
2225
pub merkle_path: Option<Vec<u8>>,
2326
pub status: TaskStatus,
27+
pub status_updated_at: DateTime<Utc>,
2428
}
2529

2630
#[derive(Debug, Clone, FromRow)]

aggregation_mode/gateway/src/http.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl GatewayServer {
7474
App::new()
7575
.app_data(Data::new(state.clone()))
7676
.wrap(prometheus.clone())
77+
.route("/", web::get().to(Self::get_root))
7778
.route("/nonce/{address}", web::get().to(Self::get_nonce))
7879
.route("/receipts", web::get().to(Self::get_receipts))
7980
.route("/proof/sp1", web::post().to(Self::post_proof_sp1))
@@ -87,6 +88,11 @@ impl GatewayServer {
8788
.expect("Server to never end");
8889
}
8990

91+
// Returns an OK response (code 200), no matters what receives in the request
92+
async fn get_root(_req: HttpRequest) -> impl Responder {
93+
HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({})))
94+
}
95+
9096
// Returns the nonce (number of submitted tasks) for a given address
9197
async fn get_nonce(req: HttpRequest) -> impl Responder {
9298
let Some(address_raw) = req.match_info().get("address") else {

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,18 @@ impl Db {
2323
Ok(Self { pool })
2424
}
2525

26-
pub async fn get_pending_tasks_and_mark_them_as_processing(
26+
/// Fetches tasks that are ready to be processed and atomically updates their status.
27+
///
28+
/// This function selects up to `limit` tasks for the given `proving_system_id` that are
29+
/// either:
30+
/// - in `pending` status, or
31+
/// - in `processing` status but whose `status_updated_at` timestamp is older than 12 hours
32+
/// (to recover tasks that may have been abandoned or stalled).
33+
///
34+
/// The selected rows are locked using `FOR UPDATE SKIP LOCKED` to ensure safe concurrent
35+
/// processing by multiple workers. All selected tasks have their status set to
36+
/// `processing` and their `status_updated_at` updated to `now()` before being returned.
37+
pub async fn get_tasks_to_process_and_update_their_status(
2738
&self,
2839
proving_system_id: i32,
2940
limit: i64,
@@ -32,12 +43,19 @@ impl Db {
3243
"WITH selected AS (
3344
SELECT task_id
3445
FROM tasks
35-
WHERE proving_system_id = $1 AND status = 'pending'
46+
WHERE proving_system_id = $1
47+
AND (
48+
status = 'pending'
49+
OR (
50+
status = 'processing'
51+
AND status_updated_at <= now() - interval '12 hours'
52+
)
53+
)
3654
LIMIT $2
3755
FOR UPDATE SKIP LOCKED
3856
)
3957
UPDATE tasks t
40-
SET status = 'processing'
58+
SET status = 'processing', status_updated_at = now()
4159
FROM selected s
4260
WHERE t.task_id = s.task_id
4361
RETURNING t.*;",
@@ -61,7 +79,7 @@ impl Db {
6179

6280
for (task_id, merkle_path) in updates {
6381
if let Err(e) = sqlx::query(
64-
"UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2",
82+
"UPDATE tasks SET merkle_path = $1, status = 'verified', status_updated_at = now(), proof = NULL WHERE task_id = $2",
6583
)
6684
.bind(merkle_path)
6785
.bind(task_id)
@@ -83,6 +101,20 @@ impl Db {
83101
Ok(())
84102
}
85103

86-
// TODO: this should be used when rolling back processing proofs on unexpected errors
87-
pub async fn mark_tasks_as_pending(&self) {}
104+
pub async fn mark_tasks_as_pending(&self, tasks_id: &[Uuid]) -> Result<(), DbError> {
105+
if tasks_id.is_empty() {
106+
return Ok(());
107+
}
108+
109+
sqlx::query(
110+
"UPDATE tasks SET status = 'pending', status_updated_at = now()
111+
WHERE task_id = ANY($1) AND status = 'processing'",
112+
)
113+
.bind(tasks_id)
114+
.execute(&self.pool)
115+
.await
116+
.map_err(|e| DbError::Query(e.to_string()))?;
117+
118+
Ok(())
119+
}
88120
}

aggregation_mode/proof_aggregator/src/backend/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl ProofsFetcher {
3030
) -> Result<(Vec<AlignedProof>, Vec<Uuid>), ProofsFetcherError> {
3131
let tasks = self
3232
.db
33-
.get_pending_tasks_and_mark_them_as_processing(engine.proving_system_id() as i32, limit)
33+
.get_tasks_to_process_and_update_their_status(engine.proving_system_id() as i32, limit)
3434
.await
3535
.map_err(ProofsFetcherError::Query)?;
3636

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,28 +119,42 @@ impl ProofAggregator {
119119
info!("Starting proof aggregator service");
120120

121121
info!("About to aggregate and submit proof to be verified on chain");
122-
let res = self.aggregate_and_submit_proofs_on_chain().await;
122+
123+
let (proofs, tasks_id) = match self
124+
.fetcher
125+
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
126+
.await
127+
.map_err(AggregatedProofSubmissionError::FetchingProofs)
128+
{
129+
Ok(res) => res,
130+
Err(e) => {
131+
error!("Error while aggregating and submitting proofs: {:?}", e);
132+
return;
133+
}
134+
};
135+
136+
let res = self
137+
.aggregate_and_submit_proofs_on_chain((proofs, &tasks_id))
138+
.await;
123139

124140
match res {
125141
Ok(()) => {
126142
info!("Process finished successfully");
127143
}
128144
Err(err) => {
129145
error!("Error while aggregating and submitting proofs: {:?}", err);
146+
warn!("Marking tasks back to pending after failure");
147+
if let Err(e) = self.db.mark_tasks_as_pending(&tasks_id).await {
148+
error!("Error while marking proofs to pending again: {:?}", e);
149+
};
130150
}
131151
}
132152
}
133153

134-
// TODO: on failure, mark proofs as pending again
135154
async fn aggregate_and_submit_proofs_on_chain(
136155
&mut self,
156+
(proofs, tasks_id): (Vec<AlignedProof>, &[Uuid]),
137157
) -> Result<(), AggregatedProofSubmissionError> {
138-
let (proofs, tasks_id) = self
139-
.fetcher
140-
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
141-
.await
142-
.map_err(AggregatedProofSubmissionError::FetchingProofs)?;
143-
144158
if proofs.is_empty() {
145159
warn!("No proofs collected, skipping aggregation...");
146160
return Ok(());
@@ -215,7 +229,7 @@ impl ProofAggregator {
215229

216230
info!("Storing merkle paths for each task...",);
217231
let mut merkle_paths_for_tasks: Vec<(Uuid, Vec<u8>)> = vec![];
218-
for (idx, task_id) in tasks_id.into_iter().enumerate() {
232+
for (idx, task_id) in tasks_id.iter().enumerate() {
219233
let Some(proof) = merkle_tree.get_proof_by_pos(idx) else {
220234
warn!("Proof not found for task id {task_id}");
221235
continue;
@@ -226,7 +240,7 @@ impl ProofAggregator {
226240
.flat_map(|e| e.to_vec())
227241
.collect::<Vec<_>>();
228242

229-
merkle_paths_for_tasks.push((task_id, proof_bytes))
243+
merkle_paths_for_tasks.push((*task_id, proof_bytes))
230244
}
231245
self.db
232246
.insert_tasks_merkle_path_and_mark_them_as_verified(merkle_paths_for_tasks)

0 commit comments

Comments
 (0)