-
Notifications
You must be signed in to change notification settings - Fork 396
Expand file tree
/
Copy pathdb.rs
More file actions
102 lines (94 loc) · 3.08 KB
/
db.rs
File metadata and controls
102 lines (94 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use db::{orchestrator::DbOrchestrator, retry::RetryConfig};
use sqlx::types::BigDecimal;
// Retry/backoff behavior summary for DB queries (see
// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation)
//
// 1) Max wait time between failures if all retries fail:
// The sleep between retries is capped at 30 seconds (RETRY_MAX_DELAY_SECONDS).
//
// 2) Wait before each retry attempt with the current config
// (start = 500ms, factor = 4.0, max retries = 5):
//
// retry 1: 0.5s
// retry 2: 2.0s
// retry 3: 8.0s
// retry 4: 30s (capped; 32s would have been next)
// retry 5: 30s
//
// Worst-case total sleep time across all retries: 70.5 seconds -> 5 blocks of ethereum waiting,
// plus the execution time of each DB attempt.
/// Initial delay before first retry attempt (in milliseconds)
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
/// Exponential backoff multiplier for retry delays
const RETRY_FACTOR: f32 = 4.0;
/// Maximum number of retry attempts
const RETRY_MAX_TIMES: usize = 5;
/// Maximum delay between retry attempts (in seconds)
const RETRY_MAX_DELAY_SECONDS: u64 = 30;
#[derive(Clone, Debug)]
pub struct Db {
orchestrator: DbOrchestrator,
}
#[derive(Debug, Clone)]
pub enum DbError {
ConnectError(String),
}
impl Db {
pub async fn try_new(connection_urls: &[String]) -> Result<Self, DbError> {
let orchestrator = DbOrchestrator::try_new(
connection_urls,
RetryConfig {
min_delay_millis: RETRY_MIN_DELAY_MILLIS,
factor: RETRY_FACTOR,
max_times: RETRY_MAX_TIMES,
max_delay_seconds: RETRY_MAX_DELAY_SECONDS,
},
)
.map_err(|e| DbError::ConnectError(e.to_string()))?;
Ok(Self { orchestrator })
}
pub async fn insert_payment_event(
&self,
address: &str,
started_at: &BigDecimal,
amount: &BigDecimal,
valid_until: &BigDecimal,
tx_hash: &str,
) -> Result<(), sqlx::Error> {
self.orchestrator
.query(async |pool| {
sqlx::query(
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_hash) DO NOTHING",
)
.bind(address.to_lowercase())
.bind(started_at)
.bind(amount)
.bind(valid_until)
.bind(tx_hash)
.execute(&pool)
.await?;
Ok(())
})
.await
}
pub async fn count_total_active_subscriptions(
&self,
epoch: BigDecimal,
) -> Result<i64, sqlx::Error> {
self.orchestrator
.query(async |pool| {
sqlx::query_scalar::<_, i64>(
"
SELECT COUNT(*)
FROM payment_events
WHERE started_at < $1 AND $1 < valid_until",
)
.bind(&epoch)
.fetch_one(&pool)
.await
})
.await
}
}