Skip to content

Commit 2fad38a

Browse files
author
kopturovdim
committed
bug upg: fix pg to userver sockaddr construction
Failed test before fix <https://nda.ya.ru/t/tLXropkU7YGdiX> Tests: unittests, testing commit_hash:e8c58b69ba609a2ace0d52af7975d01dd33adcd4
1 parent ed74e7d commit 2fad38a

File tree

5 files changed

+152
-58
lines changed

5 files changed

+152
-58
lines changed

.mapping.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3683,6 +3683,7 @@
36833683
"postgresql/src/storages/postgres/detail/pg_impl_types.hpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pg_impl_types.hpp",
36843684
"postgresql/src/storages/postgres/detail/pg_message_severity.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pg_message_severity.cpp",
36853685
"postgresql/src/storages/postgres/detail/pg_message_severity.hpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pg_message_severity.hpp",
3686+
"postgresql/src/storages/postgres/detail/pg_version.hpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pg_version.hpp",
36863687
"postgresql/src/storages/postgres/detail/pool.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pool.cpp",
36873688
"postgresql/src/storages/postgres/detail/pool.hpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/pool.hpp",
36883689
"postgresql/src/storages/postgres/detail/query_parameters.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/detail/query_parameters.cpp",
@@ -3743,6 +3744,7 @@
37433744
"postgresql/src/storages/postgres/tests/arrays_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/arrays_pgtest.cpp",
37443745
"postgresql/src/storages/postgres/tests/bitstring_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/bitstring_pgtest.cpp",
37453746
"postgresql/src/storages/postgres/tests/bytea_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/bytea_pgtest.cpp",
3747+
"postgresql/src/storages/postgres/tests/cancel_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/cancel_pgtest.cpp",
37463748
"postgresql/src/storages/postgres/tests/chrono_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/chrono_pgtest.cpp",
37473749
"postgresql/src/storages/postgres/tests/cluster_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/cluster_pgtest.cpp",
37483750
"postgresql/src/storages/postgres/tests/composite_types_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/composite_types_pgtest.cpp",

postgresql/src/storages/postgres/detail/cancel.cpp

Lines changed: 81 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,125 @@
11
#include <storages/postgres/detail/cancel.hpp>
22

3+
// Internal structures of libpq to send cancel packet using async non blocking socket
4+
35
#include <array>
46
#include <cstdint>
57

8+
#include <sys/socket.h>
9+
610
#include <userver/engine/io/sockaddr.hpp>
711
#include <userver/engine/io/socket.hpp>
812
#include <userver/storages/postgres/exceptions.hpp>
913
#include <userver/tracing/span.hpp>
1014

11-
#ifdef __clang__
12-
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
13-
#define USERVER_IMPL_DISABLE_MSAN __attribute__((no_sanitize_memory))
15+
#include <storages/postgres/detail/pg_version.hpp>
16+
17+
// NOLINTNEXTLINE(modernize-use-using)
18+
typedef struct {
19+
struct sockaddr_storage addr;
20+
socklen_t salen;
21+
} SockAddr;
22+
23+
#if USERVER_LIBPQ_VERSION >= 180000
24+
25+
struct pg_cancel {
26+
SockAddr raddr; /* Remote address */
27+
int be_pid; /* PID of to-be-canceled backend */
28+
int pgtcp_user_timeout; /* tcp user timeout */
29+
int keepalives; /* use TCP keepalives? */
30+
int keepalives_idle; /* time between TCP keepalives */
31+
int keepalives_interval; /* time between TCP keepalive
32+
* retransmits */
33+
int keepalives_count; /* maximum number of TCP keepalive
34+
* retransmits */
35+
36+
/* Pre-constructed cancel request packet starts here */
37+
int32_t cancel_pkt_len; /* in network byte order */
38+
char cancel_req[/*FLEXIBLE_ARRAY_MEMBER*/]; /* CancelRequestPacket */
39+
};
40+
1441
#else
15-
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
16-
#define USERVER_IMPL_DISABLE_MSAN
17-
#endif
1842

1943
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
2044
#define PG_PROTOCOL_MAJOR(v) ((v) >> 16)
2145
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
2246
#define PG_PROTOCOL_MINOR(v) ((v) & 0x0000ffff)
2347
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
2448
#define PG_PROTOCOL(m, n) (((m) << 16) | (n))
25-
2649
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
2750
#define CANCEL_REQUEST_CODE PG_PROTOCOL(1234, 5678)
2851

29-
using ACCEPT_TYPE_ARG3 = socklen_t;
30-
31-
struct PgSockaddrStorage {
32-
union {
33-
struct sockaddr sa; /* get the system-dependent fields */
34-
int64_t ss_align; /* ensures struct is properly aligned */
35-
char ss_pad[128]; /* ensures struct has desired size */
36-
} ss_stuff;
37-
};
38-
3952
// NOLINTNEXTLINE(modernize-use-using)
40-
typedef struct {
41-
struct PgSockaddrStorage addr;
42-
ACCEPT_TYPE_ARG3 salen;
43-
} SockAddr;
53+
typedef struct CancelRequestPacket {
54+
/* Note that each field is stored in network byte order! */
55+
uint32_t cancelRequestCode; /* code to identify a cancel request */
56+
uint32_t backendPID; /* PID of client's backend */
57+
uint32_t cancelAuthCode; /* secret key to authorize cancel */
58+
} CancelRequestPacket;
4459

4560
struct pg_cancel {
4661
SockAddr raddr;
4762
int be_pid;
4863
int be_key;
4964
};
5065

51-
// NOLINTNEXTLINE(modernize-use-using)
52-
typedef struct CancelRequestPacket {
53-
/* Note that each field is stored in network byte order! */
54-
uint32_t cancel_request_code; /* code to identify a cancel request */
55-
uint32_t backend_pid; /* PID of client's backend */
56-
uint32_t cancel_auth_code; /* secret key to authorize cancel */
57-
} CancelRequestPacket;
58-
59-
struct CancelPacket {
60-
uint32_t packetlen;
61-
CancelRequestPacket cp;
62-
};
66+
#endif
6367

6468
USERVER_NAMESPACE_BEGIN
6569

6670
namespace storages::postgres::detail {
6771

6872
namespace {
6973

70-
// TODO(TAXICOMMON-11213) investigate why memory sanitizer complains.
71-
USERVER_IMPL_DISABLE_MSAN CancelPacket MakeCancelPacket(const PGcancel& cn) noexcept {
72-
CancelPacket cp{};
73-
cp.packetlen = sizeof(cp);
74-
cp.cp.cancel_request_code = static_cast<uint32_t>(htonl(CANCEL_REQUEST_CODE));
75-
cp.cp.backend_pid = htonl(cn.be_pid);
76-
cp.cp.cancel_auth_code = htonl(cn.be_key);
77-
return cp;
74+
void SendCancelPacket(engine::io::Socket& tmp_sock, engine::Deadline deadline, const PGcancel& cn) {
75+
#if USERVER_LIBPQ_VERSION >= 180000
76+
// reference code: libpq/fe-cancel.c
77+
// In libpq cancel packet is stored directly in pg_cancel struct
78+
// starting from cancel_pkt_len field
79+
// After this field pg_cancel contains flexible array with binary prepared packet
80+
// NOTE: cancel_pkt_len in pg_cancel stored in network byte order
81+
size_t cancel_pkt_len = ntohl(cn.cancel_pkt_len);
82+
auto ret = tmp_sock.SendAll(reinterpret_cast<const char*>(&cn.cancel_pkt_len), cancel_pkt_len, deadline);
83+
if (ret != cancel_pkt_len) {
84+
throw CommandError(fmt::format(
85+
"SendCancelPacket: Failed to call SendAll(), expected bytes to send {}, actual bytes sent {}",
86+
cancel_pkt_len,
87+
ret
88+
));
89+
}
90+
#else
91+
struct {
92+
uint32_t packetlen;
93+
CancelRequestPacket cp;
94+
} crp;
95+
96+
crp.packetlen = htonl(sizeof(crp));
97+
crp.cp.cancelRequestCode = htonl(CANCEL_REQUEST_CODE);
98+
crp.cp.backendPID = htonl(cn.be_pid);
99+
crp.cp.cancelAuthCode = htonl(cn.be_key);
100+
101+
auto ret = tmp_sock.SendAll(&crp, sizeof(crp), deadline);
102+
if (ret != sizeof(crp)) {
103+
throw CommandError("SendAll()");
104+
}
105+
#endif
78106
}
79107

80108
} // namespace
81109

82-
USERVER_IMPL_DISABLE_MSAN void Cancel(PGcancel* cn, engine::Deadline deadline) {
110+
void Cancel(PGcancel* cn, engine::Deadline deadline) {
83111
tracing::Span span{"pg_cancel"};
84112
if (!cn) {
85113
return;
86114
}
87115

88-
engine::io::Sockaddr addr;
89-
memcpy(addr.Data(), &cn->raddr, std::min<size_t>(sizeof(cn->raddr), addr.Size()));
90-
116+
engine::io::Sockaddr addr(&cn->raddr.addr);
91117
engine::io::Socket tmp_sock(addr.Domain(), engine::io::SocketType::kStream);
92118

93119
tmp_sock.Connect(addr, deadline);
120+
LOG_DEBUG() << "Connected to " << addr.PrimaryAddressString() << " on port " << addr.Port();
94121

95-
CancelPacket cp = MakeCancelPacket(*cn);
96-
auto ret = tmp_sock.SendAll(&cp, sizeof(cp), deadline);
97-
if (ret != sizeof(cp)) {
98-
throw CommandError("SendAll()");
99-
}
122+
SendCancelPacket(tmp_sock, deadline, *cn);
100123

101124
/*
102125
* Comment from libpq's sources, fe-connect.c, inside internal_cancel():
@@ -107,8 +130,14 @@ USERVER_IMPL_DISABLE_MSAN void Cancel(PGcancel* cn, engine::Deadline deadline) {
107130
* one we thought we were canceling. Note we don't actually expect this
108131
* read to obtain any data, we are just waiting for EOF to be signaled.
109132
*/
110-
std::array<char, 1024> c{};
111-
[[maybe_unused]] auto ret2 = tmp_sock.RecvAll(c.data(), c.size(), deadline);
133+
try {
134+
std::array<char, 1024> c{};
135+
[[maybe_unused]] auto ret = tmp_sock.RecvAll(c.data(), c.size(), deadline);
136+
} catch (const engine::io::IoSystemError& ex) {
137+
LOG_LIMITED_INFO() << "Got exception during RecvAll after pg_cancel: " << ex;
138+
// Do not propagate exception here, because postmaster sends no response
139+
// for cancel packets, just silently close socket
140+
}
112141
}
113142

114143
} // namespace storages::postgres::detail

postgresql/src/storages/postgres/detail/pg_connection_wrapper.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ auto PQXsendQueryPrepared(PGconn* conn, const char* stmtName, int nParams, const
1616
}
1717
#endif
1818

19-
#ifdef ARCADIA_ROOT
20-
#define USERVER_LIBPQ_VERSION PG_VERSION_NUM
21-
#else
22-
#include <userver_libpq_version.hpp> // Y_IGNORE
23-
#endif
24-
2519
#include <userver/concurrent/background_task_storage.hpp>
2620
#include <userver/crypto/openssl.hpp>
2721
#include <userver/engine/task/cancel.hpp>
@@ -32,6 +26,7 @@ auto PQXsendQueryPrepared(PGconn* conn, const char* stmtName, int nParams, const
3226

3327
#include <storages/postgres/detail/cancel.hpp>
3428
#include <storages/postgres/detail/pg_message_severity.hpp>
29+
#include <storages/postgres/detail/pg_version.hpp>
3530
#include <storages/postgres/detail/tracing_tags.hpp>
3631
#include <userver/storages/postgres/dsn.hpp>
3732
#include <userver/storages/postgres/exceptions.hpp>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <pg_config.h>
4+
5+
#ifdef ARCADIA_ROOT
6+
#define USERVER_LIBPQ_VERSION PG_VERSION_NUM
7+
#else
8+
#include <userver_libpq_version.hpp> // Y_IGNORE
9+
#endif
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include <storages/postgres/tests/util_pgtest.hpp>
2+
#include <userver/engine/async.hpp>
3+
#include <userver/engine/sleep.hpp>
4+
5+
USERVER_NAMESPACE_BEGIN
6+
7+
namespace pg = storages::postgres;
8+
9+
namespace {
10+
11+
UTEST_P(PostgreConnection, Cancel) {
12+
/*
13+
* IF THIS TEST FAILS DURING UPDATE OF libpq VERSION IN CONTRIB,
14+
* WRITE SOMEONE FROM TPS-60916
15+
* We verify the logic of request cancellation in PostgreSQL.
16+
* To do this, we start an asynchronous task that performs a long sleep on the PostgreSQL side (putting the
17+
* connection into the busy state). Then we cancel this task. The connection should remain busy. Next, we issue a
18+
* query cancellation request with a timeout shorter than the sleep duration on the PostgreSQL side (this is
19+
* important!). If the query is canceled successfully, the connection will transition to the idle state. If the
20+
* cancellation does not work, we will catch a timeout exception during DiscardInput. This happens specifically
21+
* because the cancel timeout is shorter than the duration passed to pg_sleep.
22+
*/
23+
auto& conn = GetConn();
24+
CheckConnection(conn);
25+
26+
// Setup query timeout for 1 minute
27+
const auto cmd_ctrl = pg::CommandControl{
28+
/*network_timeout*/ std::chrono::seconds{60},
29+
/*statement_timeout*/ std::chrono::seconds{60},
30+
};
31+
32+
// sleep on postgres side for 1 minute
33+
auto task = engine::CriticalAsyncNoSpan([&conn, &cmd_ctrl]() {
34+
LOG_DEBUG() << "Enter pg_sleep";
35+
conn->Execute("select pg_sleep(60)", /*query_params*/ {}, cmd_ctrl);
36+
LOG_DEBUG() << "Return from pg_sleep";
37+
});
38+
39+
// short sleep to ensure select in async task was started
40+
engine::SleepFor(std::chrono::milliseconds{10});
41+
ASSERT_FALSE(conn->IsIdle()) << "connection must be in busy state";
42+
43+
// cancel task
44+
task.SyncCancel();
45+
ASSERT_FALSE(conn->IsIdle()) << "connection must remain in busy state";
46+
47+
// try to cleanup with short timeout
48+
// if cancel did not work, here we will get timeout exception
49+
// because cancel timeout (5s) is less than pg_sleep(60s)
50+
ASSERT_NO_THROW(conn->CancelAndCleanup(std::chrono::seconds{5}));
51+
52+
ASSERT_TRUE(conn->IsIdle()) << "connection must be in idle state";
53+
54+
ASSERT_NO_THROW(conn->Execute("select 1", /*query_params*/ {}, cmd_ctrl)) << "connection must be usable";
55+
}
56+
57+
} // namespace
58+
59+
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)