Skip to content

Commit 945a40a

Browse files
author
lexeyo
committed
refactor postgres: refactor ConnlimitWatchdog
commit_hash:ba9e7a81d31e0627193354f9db2ad50343876c82
1 parent da3ec9c commit 945a40a

File tree

2 files changed

+102
-108
lines changed

2 files changed

+102
-108
lines changed

postgresql/src/storages/postgres/connlimit_watchdog.cpp

Lines changed: 87 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,29 @@ namespace storages::postgres {
99

1010
namespace {
1111
constexpr CommandControl kCommandControl{std::chrono::seconds(2), std::chrono::seconds(2)};
12-
constexpr size_t kTestsuiteConnlimit = 100;
13-
constexpr size_t kReservedConn = 5;
12+
constexpr std::size_t kTestsuiteConnlimit = 100;
13+
constexpr std::size_t kReservedConn = 5;
1414

1515
constexpr int kMaxStepsWithError = 3;
16-
constexpr size_t kFallbackConnlimit = 20;
16+
constexpr std::size_t kFallbackConnlimit = 20;
17+
18+
std::size_t GetMaxConnections(Transaction& trx) {
19+
const auto max_server_connections = USERVER_NAMESPACE::utils::FromString<
20+
ssize_t>(trx.Execute("SHOW max_connections;").AsSingleRow<std::string>());
21+
auto max_user_connections =
22+
trx.Execute("SELECT rolconnlimit FROM pg_roles WHERE rolname = current_user").AsSingleRow<ssize_t>();
23+
if (max_user_connections < 0) {
24+
max_user_connections = max_server_connections;
25+
}
26+
std::size_t max_connections = std::min(max_server_connections, max_user_connections);
27+
28+
if (max_connections > kReservedConn) {
29+
max_connections -= kReservedConn;
30+
} else {
31+
max_connections = 1;
32+
}
33+
return max_connections;
34+
}
1735

1836
} // namespace
1937

@@ -34,7 +52,7 @@ ConnlimitWatchdog::ConnlimitWatchdog(
3452

3553
void ConnlimitWatchdog::Start() {
3654
try {
37-
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
55+
auto trx = BeginTransaction();
3856
trx.Execute(R"(
3957
CREATE TABLE IF NOT EXISTS u_clients (
4058
hostname TEXT PRIMARY KEY,
@@ -69,117 +87,65 @@ void ConnlimitWatchdog::Start() {
6987
}
7088

7189
void ConnlimitWatchdog::StepV1() {
72-
static auto hostname = hostinfo::blocking::GetRealHostName();
73-
try {
74-
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
75-
76-
auto max_connections1 = USERVER_NAMESPACE::utils::FromString<
77-
ssize_t>(trx.Execute("SHOW max_connections;").AsSingleRow<std::string>());
78-
auto max_connections2 =
79-
trx.Execute("SELECT rolconnlimit FROM pg_roles WHERE rolname = current_user").AsSingleRow<ssize_t>();
80-
if (max_connections2 < 0) {
81-
max_connections2 = max_connections1;
82-
}
83-
size_t max_connections = std::min(max_connections1, max_connections2);
90+
static const Query kUpsertClientMaxConnections{
91+
R"(
92+
INSERT INTO u_clients (hostname, updated, max_connections)
93+
VALUES ($1, NOW(), $2)
94+
ON CONFLICT (hostname) DO UPDATE SET updated = NOW(), max_connections = $2
95+
)",
96+
"UpsertMaxConnectionsV1"
97+
};
98+
99+
static const Query kSelectInstances{
100+
R"(
101+
SELECT count(*) FROM u_clients WHERE updated >= NOW() - make_interval(secs => 15)
102+
)",
103+
"SelectInstancesV1"
104+
};
84105

85-
if (max_connections > kReservedConn) {
86-
max_connections -= kReservedConn;
87-
} else {
88-
max_connections = 1;
89-
}
106+
static auto hostname = hostinfo::blocking::GetRealHostName();
90107

91-
trx.Execute(
92-
"INSERT INTO u_clients (hostname, updated, max_connections) VALUES "
93-
"($1, "
94-
"NOW(), $2) ON CONFLICT (hostname) DO UPDATE SET updated = NOW(), "
95-
"max_connections = $2",
96-
hostname,
97-
static_cast<int>(GetConnlimit())
98-
);
99-
auto instances =
100-
trx.Execute(
101-
"SELECT count(*) FROM u_clients WHERE updated >= "
102-
"NOW() - make_interval(secs => 15)"
103-
)
104-
.AsSingleRow<int>();
105-
if (instances == 0) {
106-
instances = 1;
107-
}
108+
DoStep(hostname, kUpsertClientMaxConnections, kSelectInstances);
109+
}
108110

109-
auto connlimit = max_connections / instances;
110-
if (connlimit == 0) {
111-
connlimit = 1;
112-
}
113-
LOG((connlimit_ == connlimit) ? logging::Level::kDebug : logging::Level::kWarning
114-
) << "max_connections = "
115-
<< max_connections << ", instances = " << instances << ", connlimit = " << connlimit;
116-
connlimit_ = connlimit;
111+
void ConnlimitWatchdog::StepV2() {
112+
static const Query kUpsertClientMaxConnections{
113+
R"(
114+
INSERT INTO u_clients (hostname, updated, max_connections, cur_user)
115+
VALUES ($1, NOW(), $2, current_user)
116+
ON CONFLICT (hostname) DO UPDATE SET updated = NOW(), max_connections = $2, cur_user = current_user
117+
)",
118+
"UpsertMaxConnectionsV2"
119+
};
117120

118-
trx.Commit();
119-
steps_with_errors_ = 0;
120-
} catch (const Error& e) {
121-
if (++steps_with_errors_ > kMaxStepsWithError) {
122-
/*
123-
* Something's wrong with PG server. Try to lower the load by lowering
124-
* max connection to a small value. Active connections will be gracefully
125-
* closed. When the server returns the response, we'll get the real
126-
* connlimit value. The period with "too low max_connections" should be
127-
* relatively small.
128-
*/
129-
connlimit_ = kFallbackConnlimit;
130-
}
131-
}
121+
static const Query kSelectInstances{
122+
R"(
123+
SELECT count(*) FROM u_clients WHERE updated >= NOW() - make_interval(secs => 15) AND (cur_user = current_user OR cur_user is NULL)
124+
)",
125+
"SelectInstancesV2"
126+
};
132127

133-
on_new_connlimit_();
128+
DoStep(host_name_, kUpsertClientMaxConnections, kSelectInstances);
134129
}
135130

136-
void ConnlimitWatchdog::StepV2() {
137-
static auto hostname = hostinfo::blocking::GetRealHostName();
138-
try {
139-
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
140-
141-
auto max_connections1 = USERVER_NAMESPACE::utils::FromString<
142-
ssize_t>(trx.Execute("SHOW max_connections;").AsSingleRow<std::string>());
143-
auto max_connections2 =
144-
trx.Execute("SELECT rolconnlimit FROM pg_roles WHERE rolname = current_user").AsSingleRow<ssize_t>();
145-
if (max_connections2 < 0) {
146-
max_connections2 = max_connections1;
147-
}
148-
size_t max_connections = std::min(max_connections1, max_connections2);
131+
void ConnlimitWatchdog::Stop() { periodic_.Stop(); }
149132

150-
if (max_connections > kReservedConn) {
151-
max_connections -= kReservedConn;
152-
} else {
153-
max_connections = 1;
154-
}
133+
std::size_t ConnlimitWatchdog::GetConnlimit() const noexcept { return connlimit_.load(); }
155134

156-
trx.Execute(
157-
R"(
158-
INSERT INTO u_clients (hostname, updated, max_connections, cur_user) VALUES
159-
($1, NOW(), $2, current_user) ON CONFLICT (hostname) DO UPDATE SET updated = NOW(), max_connections = $2, cur_user = current_user
160-
)",
161-
host_name_,
162-
static_cast<int>(GetConnlimit())
163-
);
135+
void ConnlimitWatchdog::DoStep(
136+
const std::string& hostname,
137+
const Query& update_max_connections_query,
138+
const Query& select_instances_query
139+
) {
140+
try {
141+
auto trx = BeginTransaction();
164142

165-
auto
166-
instances =
167-
trx.Execute(
168-
R"(SELECT count(*) FROM u_clients WHERE updated >= NOW() - make_interval(secs => 15) AND (cur_user = current_user OR cur_user is NULL))"
169-
)
170-
.AsSingleRow<int>();
171-
if (instances == 0) {
172-
instances = 1;
173-
}
143+
const auto max_connections = GetMaxConnections(trx);
174144

175-
auto connlimit = max_connections / instances;
176-
if (connlimit == 0) {
177-
connlimit = 1;
178-
}
179-
LOG((connlimit_ == connlimit) ? logging::Level::kDebug : logging::Level::kWarning
180-
) << "max_connections = "
181-
<< max_connections << ", instances = " << instances << ", connlimit = " << connlimit;
182-
connlimit_ = connlimit;
145+
trx.Execute(update_max_connections_query, hostname, static_cast<int>(GetConnlimit()));
146+
auto instances = trx.Execute(select_instances_query).AsSingleRow<int>();
147+
148+
UpdateConnectionsLimit(max_connections, instances);
183149

184150
trx.Commit();
185151
steps_with_errors_ = 0;
@@ -199,9 +165,24 @@ void ConnlimitWatchdog::StepV2() {
199165
on_new_connlimit_();
200166
}
201167

202-
void ConnlimitWatchdog::Stop() { periodic_.Stop(); }
168+
Transaction ConnlimitWatchdog::BeginTransaction() {
169+
return cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
170+
}
171+
172+
void ConnlimitWatchdog::UpdateConnectionsLimit(std::size_t max_connections, std::size_t instances) {
173+
if (instances == 0) {
174+
instances = 1;
175+
}
203176

204-
size_t ConnlimitWatchdog::GetConnlimit() const { return connlimit_.load(); }
177+
auto new_connlimit = max_connections / instances;
178+
if (new_connlimit == 0) {
179+
new_connlimit = 1;
180+
}
181+
auto previous_connlimit = connlimit_.exchange(new_connlimit);
182+
LOG((previous_connlimit == new_connlimit) ? logging::Level::kDebug : logging::Level::kWarning
183+
) << "max_connections = "
184+
<< max_connections << ", instances = " << instances << ", connlimit = " << new_connlimit;
185+
}
205186

206187
} // namespace storages::postgres
207188

postgresql/src/storages/postgres/connlimit_watchdog.hpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
#pragma once
22

33
#include <atomic>
4+
#include <cstddef>
45

56
#include <userver/hostinfo/blocking/get_hostname.hpp>
7+
#include <userver/storages/postgres/postgres_fwd.hpp>
8+
#include <userver/storages/postgres/query.hpp>
69
#include <userver/testsuite/tasks.hpp>
710
#include <userver/utils/periodic_task.hpp>
811

@@ -32,11 +35,21 @@ class ConnlimitWatchdog final {
3235
void StepV1();
3336
void StepV2();
3437

35-
size_t GetConnlimit() const;
38+
std::size_t GetConnlimit() const noexcept;
3639

3740
private:
41+
Transaction BeginTransaction();
42+
43+
void UpdateConnectionsLimit(std::size_t max_connections, std::size_t instances);
44+
45+
void DoStep(
46+
const std::string& hostname,
47+
const Query& update_max_connections_query,
48+
const Query& select_instances_query
49+
);
50+
3851
detail::ClusterImpl& cluster_;
39-
std::atomic<size_t> connlimit_;
52+
std::atomic<std::size_t> connlimit_;
4053
std::function<void()> on_new_connlimit_;
4154
testsuite::TestsuiteTasks& testsuite_tasks_;
4255
int steps_with_errors_{0};

0 commit comments

Comments
 (0)