Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,16 @@ agg_mode_run_migrations: agg_mode_docker_up
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin migrate -- postgres://postgres:postgres@localhost:5435/

agg_mode_batcher_start_local: agg_mode_run_migrations
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin agg_mode_batcher -- config-files/config-agg-mode-batcher.yaml
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin batcher -- config-files/config-agg-mode-batcher.yaml

agg_mode_batcher_start_ethereum_package: agg_mode_run_migrations
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin agg_mode_batcher -- config-files/config-agg-mode-batcher-ethereum-package.yaml
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin batcher -- config-files/config-agg-mode-batcher-ethereum-package.yaml

agg_mode_batcher_poller_start_local: agg_mode_run_migrations
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin agg_mode_batcher_poller -- config-files/config-agg-mode-batcher.yaml

agg_mode_batcher_poller_start_ethereum_package: agg_mode_run_migrations
cargo run --manifest-path ./aggregation_mode/Cargo.toml --release --bin agg_mode_batcher_poller -- config-files/config-agg-mode-batcher-ethereum-package.yaml

AGG_MODE_SENDER ?= 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
agg_mode_batcher_send_payment:
Expand Down
25 changes: 21 additions & 4 deletions aggregation_mode/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aggregation_mode/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["./batcher", "./proof_aggregator", "./db"]
members = ["./batcher", "./proof_aggregator", "./db", "./payments_poller"]

[workspace.package]
version = "0.1.0"
Expand Down
2 changes: 1 addition & 1 deletion aggregation_mode/batcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "agg_mode_batcher"
name = "batcher"
version = "0.1.0"
edition = "2021"

Expand Down
2 changes: 0 additions & 2 deletions aggregation_mode/batcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use serde::{Deserialize, Serialize};
pub struct Config {
pub port: u16,
pub db_connection_url: String,
pub eth_rpc_url: String,
pub payment_service_address: String,
}

impl Config {
Expand Down
23 changes: 0 additions & 23 deletions aggregation_mode/batcher/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,6 @@ impl Db {
.await
}

pub async fn insert_payment_event(
&self,
address: &str,
started_at: &BigDecimal,
amount: &BigDecimal,
valid_until: &BigDecimal,
tx_hash: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_hash) DO NOTHING",
)
.bind(address.to_lowercase())
.bind(started_at)
.bind(amount)
.bind(valid_until)
.bind(tx_hash)
.execute(&self.pool)
.await
.map(|_| ())
}

pub async fn has_active_payment_event(
&self,
address: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::{
use crate::{
config::Config,
db::Db,
server::types::{GetReceiptsResponse, SubmitProofRequestRisc0, SubmitProofRequestSP1},
types::{GetReceiptsResponse, SubmitProofRequestRisc0, SubmitProofRequestSP1},
verifiers::{verify_sp1_proof, VerificationError},
};

Expand Down
5 changes: 3 additions & 2 deletions aggregation_mode/batcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
pub mod db;
pub mod payments;
pub mod server;
mod helpers;
pub mod http;
mod types;
mod verifiers;
11 changes: 2 additions & 9 deletions aggregation_mode/batcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::env;

use agg_mode_batcher::config::Config;
use agg_mode_batcher::payments::PaymentsPoller;
use agg_mode_batcher::{db::Db, server::http::BatcherServer};
use batcher::{config::Config, db::Db, http::BatcherServer};
use tracing_subscriber::{EnvFilter, FmtSubscriber};

fn read_config_filepath_from_args() -> String {
Expand Down Expand Up @@ -32,14 +30,9 @@ async fn main() {
.await
.expect("db to start");

let payment_poller = PaymentsPoller::new(db.clone(), config.clone());
let http_server = BatcherServer::new(db, config.clone());

let payment_poller_handle = tokio::spawn(async move { payment_poller.start().await });
let http_server_handle = tokio::spawn(async move { http_server.start().await });

// TODO: maybe this could two different processes (started with different commands) instead of being in the same one
// TODO: abort the process if one stops instead of waiting for them both
// TODO: ctrl + c handler for aborting the process should work
let _ = tokio::join!(payment_poller_handle, http_server_handle);
let _ = tokio::join!(http_server_handle);
}
3 changes: 0 additions & 3 deletions aggregation_mode/batcher/src/server/mod.rs

This file was deleted.

18 changes: 18 additions & 0 deletions aggregation_mode/payments_poller/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "agg_mode_batcher_poller"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
aligned-sdk = { workspace = true }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
actix-web = "4"
alloy = { workspace = true }
tokio = { version = "1", features = ["time"]}
# TODO: enable tls
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal" ] }
hex = "0.4"
20 changes: 20 additions & 0 deletions aggregation_mode/payments_poller/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::{fs::File, io::Read};

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
pub db_connection_url: String,
pub eth_rpc_url: String,
pub payment_service_address: String,
}

impl Config {
pub fn from_file(file_path: &str) -> Result<Config, Box<dyn std::error::Error>> {
let mut file = File::open(file_path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let config: Config = serde_yaml::from_str(&contents)?;
Ok(config)
}
}
46 changes: 46 additions & 0 deletions aggregation_mode/payments_poller/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use sqlx::{postgres::PgPoolOptions, types::BigDecimal, Pool, Postgres};

#[derive(Clone, Debug)]
pub struct Db {
pool: Pool<Postgres>,
}

#[derive(Debug, Clone)]
pub enum DbError {
ConnectError(String),
}

impl Db {
pub async fn try_new(connection_url: &str) -> Result<Self, DbError> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(connection_url)
.await
.map_err(|e| DbError::ConnectError(e.to_string()))?;

Ok(Self { pool })
}

pub async fn insert_payment_event(
&self,
address: &str,
started_at: &BigDecimal,
amount: &BigDecimal,
valid_until: &BigDecimal,
tx_hash: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_hash) DO NOTHING",
)
.bind(address.to_lowercase())
.bind(started_at)
.bind(amount)
.bind(valid_until)
.bind(tx_hash)
.execute(&self.pool)
.await
.map(|_| ())
}
}
4 changes: 4 additions & 0 deletions aggregation_mode/payments_poller/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod config;
pub mod db;
pub mod payments;
pub mod types;
38 changes: 38 additions & 0 deletions aggregation_mode/payments_poller/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::env;

use agg_mode_batcher_poller::{config::Config, db::Db, payments::PaymentsPoller};
use tracing_subscriber::{EnvFilter, FmtSubscriber};

fn read_config_filepath_from_args() -> String {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
panic!(
"You must provide a config file. Usage: {} <config-file-path>",
args[0]
);
}

args[1].clone()
}

#[tokio::main]
async fn main() {
let filter = EnvFilter::new("info,sp1_cuda=warn");
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let config_file_path = read_config_filepath_from_args();
tracing::info!("Loading config from {}...", config_file_path);
let config = Config::from_file(&config_file_path).expect("Config is valid");
tracing::info!("Config loaded");

let db = Db::try_new(&config.db_connection_url)
.await
.expect("db to start");

let payment_poller = PaymentsPoller::new(db.clone(), config.clone());

let payment_poller_handle = tokio::spawn(async move { payment_poller.start().await });

let _ = tokio::join!(payment_poller_handle);
}
Original file line number Diff line number Diff line change
@@ -1,55 +1,16 @@
use std::str::FromStr;

use crate::{config::Config, db::Db};
use crate::{
config::Config,
db::Db,
types::{AggregationModePaymentService, AggregationModePaymentServiceContract, RpcProvider},
};
use alloy::{
primitives::Address,
providers::{Provider, ProviderBuilder},
sol,
};
use sqlx::types::BigDecimal;

sol!(
#[sol(rpc)]
AggregationModePaymentService,
"abi/AggregationModePaymentService.json"
);

type AggregationModePaymentServiceContract =
AggregationModePaymentService::AggregationModePaymentServiceInstance<
alloy::providers::fillers::FillProvider<
alloy::providers::fillers::JoinFill<
alloy::providers::Identity,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::GasFiller,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::BlobGasFiller,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::NonceFiller,
alloy::providers::fillers::ChainIdFiller,
>,
>,
>,
>,
alloy::providers::RootProvider,
>,
>;
type RpcProvider = alloy::providers::fillers::FillProvider<
alloy::providers::fillers::JoinFill<
alloy::providers::Identity,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::GasFiller,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::BlobGasFiller,
alloy::providers::fillers::JoinFill<
alloy::providers::fillers::NonceFiller,
alloy::providers::fillers::ChainIdFiller,
>,
>,
>,
>,
alloy::providers::RootProvider,
>;

pub struct PaymentsPoller {
db: Db,
proof_aggregation_service: AggregationModePaymentServiceContract,
Expand Down
Loading
Loading