Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a51d8f7
feat(aggregation-mode): add grafana dashboard
maximopalopoli Dec 19, 2025
94c0404
fix: return BadRequest for the request denied exams
maximopalopoli Dec 19, 2025
9ce4dde
Update the dashboard to include code ranges (2xx, 4xx, 5xx)
maximopalopoli Dec 22, 2025
531e929
Filter by range in dashboards and remove the /metrics reqs in 2xx one
maximopalopoli Dec 22, 2025
61f889a
Add time_elapsed_db_post metric to have a metric in use
maximopalopoli Dec 22, 2025
00d5338
Add the payments poller metrics integration
maximopalopoli Dec 22, 2025
77fc789
fix clippy lints on agg mode
maximopalopoli Dec 22, 2025
710ff46
Remove unnecessary labeling on actix prometheus metrics builder
maximopalopoli Dec 22, 2025
3aeb64a
use tracing::error instead of eprintln
maximopalopoli Dec 22, 2025
07eb91b
Remove anyhow dependency
maximopalopoli Dec 22, 2025
dad51de
Convert the warp servers into
maximopalopoli Dec 22, 2025
09cc63c
Merge branch 'staging' into feataggmode/add-grafana-dashboard
maximopalopoli Dec 23, 2025
deaa77a
Add comment about the actix http prometheus server
maximopalopoli Dec 23, 2025
09eb015
Return an Arc in the Metrics structs start method
maximopalopoli Dec 23, 2025
7ee8201
Make the time_elapsed_db_query to be a histogram vec
maximopalopoli Dec 23, 2025
b50ebb4
Change the actix prometheus version to the latest stable
maximopalopoli Jan 5, 2026
2aa1550
Fix expression in grafana dashboard to match the rate interval instea…
maximopalopoli Jan 5, 2026
c38b398
Merge branch 'staging' into feataggmode/add-grafana-dashboard
maximopalopoli Jan 6, 2026
f4696d8
update the grafana dashboard timeline queries
maximopalopoli Jan 6, 2026
816e710
Improve naming on time elapsed vars of http server
maximopalopoli Jan 6, 2026
c00abcd
simplify the name for the prometheus jobs and bots for agg mode
maximopalopoli Jan 6, 2026
c608dc2
update the agg mode dash with the new names
maximopalopoli Jan 6, 2026
3281d07
Merge branch 'staging' into feataggmode/add-grafana-dashboard
maximopalopoli Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 257 additions & 11 deletions aggregation_mode/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions aggregation_mode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ aligned-sdk = { path = "../crates/sdk/" }
db = { path = "./db" }
sp1-sdk = "5.0.0"
risc0-zkvm = { version = "3.0.3" }
prometheus = { version = "0.13.4", features = ["process"] }
anyhow = { version = "1.0" }
warp = "0.3.7"

[profile.release]
opt-level = 3
4 changes: 4 additions & 0 deletions aggregation_mode/gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ edition = "2021"
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
prometheus = { workspace = true }
anyhow = { workspace = true }
warp = { workspace = true }
agg_mode_sdk = { path = "../sdk"}
aligned-sdk = { workspace = true }
sp1-sdk = { workspace = true }
Expand All @@ -15,6 +18,7 @@ tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
bincode = "1.3.3"
actix-web = "4"
actix-multipart = "0.7.2"
actix-web-prometheus = "0.1.0-beta.8"
alloy = { workspace = true }
tokio = { version = "1", features = ["time"]}
# TODO: enable tls
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Config {
pub db_connection_url: String,
pub network: String,
pub max_daily_proofs_per_user: i64,
pub gateway_metrics_port: u16,
}

impl Config {
Expand Down
41 changes: 36 additions & 5 deletions aggregation_mode/gateway/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{
collections::HashMap,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
time::{Instant, SystemTime, UNIX_EPOCH},
};

use actix_multipart::form::MultipartForm;
use actix_web::{
web::{self, Data},
App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_prometheus::PrometheusMetricsBuilder;
use agg_mode_sdk::types::Network;
use aligned_sdk::aggregation_layer::AggregationModeProvingSystem;
use alloy::signers::Signature;
Expand All @@ -23,6 +25,7 @@ use crate::{
config::Config,
db::Db,
helpers::get_time_left_day_formatted,
metrics::GatewayMetrics,
types::{GetReceiptsResponse, SubmitProofRequestRisc0, SubmitProofRequestSP1},
verifiers::{verify_sp1_proof, VerificationError},
};
Expand All @@ -32,15 +35,25 @@ pub struct GatewayServer {
db: Db,
config: Config,
network: Network,
metrics: GatewayMetrics,
}

impl GatewayServer {
pub fn new(db: Db, config: Config) -> Self {
let network = Network::from_str(&config.network).expect("A valid network in config file");

tracing::info!(
"Starting metrics server on port {}",
config.gateway_metrics_port
);
let metrics = GatewayMetrics::start(config.gateway_metrics_port)
.expect("Failed to start metrics server");

Self {
db,
config,
network,
metrics,
}
}

Expand All @@ -49,10 +62,19 @@ impl GatewayServer {
let port = self.config.port;
let state = self.clone();

let mut labels = HashMap::new();
labels.insert("label1".to_string(), "value1".to_string());
let prometheus = PrometheusMetricsBuilder::new("api")
.endpoint("/metrics")
.const_labels(labels)
.build()
.unwrap();

Comment thread
MarcosNicolau marked this conversation as resolved.
tracing::info!("Starting server at port {}", self.config.port);
HttpServer::new(move || {
App::new()
.app_data(Data::new(state.clone()))
.wrap(prometheus.clone())
.route("/nonce/{address}", web::get().to(Self::get_nonce))
.route("/receipts", web::get().to(Self::get_receipts))
.route("/proof/sp1", web::post().to(Self::post_proof_sp1))
Expand Down Expand Up @@ -151,7 +173,7 @@ impl GatewayServer {
if daily_tasks_by_address >= state.config.max_daily_proofs_per_user {
let formatted_time_left = get_time_left_day_formatted();

return HttpResponse::InternalServerError().json(AppResponse::new_unsucessfull(
return HttpResponse::BadRequest().json(AppResponse::new_unsucessfull(
format!(
"Request denied: Query limit exceeded. Quotas renew in {formatted_time_left}"
)
Expand Down Expand Up @@ -221,6 +243,8 @@ impl GatewayServer {
return HttpResponse::BadRequest().json(AppResponse::new_unsucessfull(message, 400));
};

let start = Instant::now();
Comment thread
MarcosNicolau marked this conversation as resolved.
Outdated

match state
.db
.insert_task(
Expand All @@ -233,9 +257,16 @@ impl GatewayServer {
)
.await
{
Ok(task_id) => HttpResponse::Ok().json(AppResponse::new_sucessfull(
serde_json::json!({ "task_id": task_id.to_string() }),
)),
Ok(task_id) => {
let duration = start.elapsed();
state
.metrics
.register_db_response_time_post(duration.as_secs_f64());
Comment thread
MarcosNicolau marked this conversation as resolved.
Outdated

HttpResponse::Ok().json(AppResponse::new_sucessfull(
serde_json::json!({ "task_id": task_id.to_string() }),
))
}
Err(_) => HttpResponse::InternalServerError()
.json(AppResponse::new_unsucessfull("Internal server error", 500)),
}
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod config;
pub mod db;
mod helpers;
pub mod http;
mod metrics;
mod types;
mod verifiers;
54 changes: 54 additions & 0 deletions aggregation_mode/gateway/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use prometheus::{self, histogram_opts, register_histogram};
use warp::{reject::Rejection, reply::Reply, Filter};

#[derive(Clone, Debug)]
pub struct GatewayMetrics {
pub time_elapsed_db_post: prometheus::Histogram,
}

impl GatewayMetrics {
pub fn start(metrics_port: u16) -> anyhow::Result<Self> {
let registry = prometheus::Registry::new();

let time_elapsed_db_post = register_histogram!(histogram_opts!(
"time_elapsed_db_post",
"Time elapsed in DB posts"
))?;

registry.register(Box::new(time_elapsed_db_post.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
.and_then(GatewayMetrics::metrics_handler);

tokio::task::spawn(async move {
warp::serve(metrics_route)
.run(([0, 0, 0, 0], metrics_port))
.await;
});

Ok(Self {
time_elapsed_db_post,
})
}

pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
eprintln!("could not encode prometheus metrics: {e}");
Comment thread
MarcosNicolau marked this conversation as resolved.
Outdated
};
let res = String::from_utf8(buffer.clone())
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
.unwrap_or_default();
buffer.clear();

Ok(res)
}

pub fn register_db_response_time_post(&self, value: f64) {
self.time_elapsed_db_post.observe(value);
}
}
3 changes: 3 additions & 0 deletions aggregation_mode/payments_poller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
aligned-sdk = { workspace = true }
prometheus = { workspace = true }
anyhow = { workspace = true }
Comment thread
MarcosNicolau marked this conversation as resolved.
Outdated
warp = { workspace = true }
Comment thread
MarcosNicolau marked this conversation as resolved.
Outdated
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
actix-web = "4"
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/payments_poller/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Config {
pub eth_rpc_url: String,
pub payment_service_address: String,
pub last_block_fetched_filepath: String,
pub poller_metrics_port: u16,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/payments_poller/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod config;
pub mod db;
pub mod metrics;
pub mod payments;
pub mod types;
54 changes: 54 additions & 0 deletions aggregation_mode/payments_poller/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use prometheus::{self, opts, register_gauge};
use warp::{reject::Rejection, reply::Reply, Filter};

#[derive(Clone, Debug)]
pub struct PaymentsPollerMetrics {
pub last_processed_block: prometheus::Gauge,
}

impl PaymentsPollerMetrics {
pub fn start(metrics_port: u16) -> anyhow::Result<Self> {
let registry = prometheus::Registry::new();

let last_processed_block = register_gauge!(opts!(
"last_processed_block",
"Last processed block by poller"
))?;

registry.register(Box::new(last_processed_block.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
.and_then(PaymentsPollerMetrics::metrics_handler);

tokio::task::spawn(async move {
warp::serve(metrics_route)
.run(([0, 0, 0, 0], metrics_port))
.await;
});

Ok(Self {
last_processed_block,
})
}

pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
eprintln!("could not encode prometheus metrics: {e}");
};
let res = String::from_utf8(buffer.clone())
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
.unwrap_or_default();
buffer.clear();

Ok(res)
}

pub fn register_last_processed_block(&self, value: u64) {
self.last_processed_block.set(value as f64);
}
}
12 changes: 12 additions & 0 deletions aggregation_mode/payments_poller/src/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::str::FromStr;
use crate::{
config::Config,
db::Db,
metrics::PaymentsPollerMetrics,
types::{AggregationModePaymentService, AggregationModePaymentServiceContract, RpcProvider},
};
use alloy::{
Expand All @@ -21,6 +22,7 @@ pub struct PaymentsPoller {
proof_aggregation_service: AggregationModePaymentServiceContract,
rpc_provider: RpcProvider,
config: Config,
metrics: PaymentsPollerMetrics,
}

impl PaymentsPoller {
Expand All @@ -38,11 +40,19 @@ impl PaymentsPoller {
.get_last_block_fetched()
.map_err(|err| PaymentsPollerError::ReadLastBlockError(err.to_string()));

tracing::info!(
"Starting metrics server on port {}",
config.poller_metrics_port
);
let metrics = PaymentsPollerMetrics::start(config.poller_metrics_port)
.expect("Failed to start metrics server");

Ok(Self {
db,
proof_aggregation_service,
rpc_provider,
config,
metrics,
})
}

Expand Down Expand Up @@ -121,6 +131,8 @@ impl PaymentsPoller {
continue;
};

self.metrics.register_last_processed_block(current_block);

tokio::time::sleep(std::time::Duration::from_secs(
seconds_to_wait_between_polls,
))
Expand Down
7 changes: 6 additions & 1 deletion config-files/config-agg-mode-gateway-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ db_connection_url: "postgres://postgres:postgres@localhost:5435/"
eth_rpc_url: "http://localhost:8545"
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
network: "devnet"
max_daily_proofs_per_user: 32
max_daily_proofs_per_user: 100
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"

# Metrics
gateway_metrics_port: 9094
poller_metrics_port: 9095
6 changes: 5 additions & 1 deletion config-files/config-agg-mode-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ db_connection_url: "postgres://postgres:postgres@localhost:5435/"
eth_rpc_url: "http://localhost:8545"
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
network: "devnet"
max_daily_proofs_per_user: 4
max_daily_proofs_per_user: 100
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"

# Metrics
gateway_metrics_port: 9094
poller_metrics_port: 9095
Loading
Loading