Skip to content

Commit 7ccf37b

Browse files
sav-darobot-piglet
authored andcommitted
feat rabbitmq: heartbeat and message headers support
- Add support for custom message headers - Enable configurable heartbeat interval for connection health checks #137 --- Pull Request resolved: #1129 commit_hash:c4879c75cbdd4d9ba225b84a8d6d13fe20746a58
1 parent 14445b0 commit 7ccf37b

19 files changed

+727
-51
lines changed

.mapping.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3825,6 +3825,7 @@
38253825
"rabbitmq/include/userver/urabbitmq/typedefs.hpp":"taxi/uservices/userver/rabbitmq/include/userver/urabbitmq/typedefs.hpp",
38263826
"rabbitmq/library.yaml":"taxi/uservices/userver/rabbitmq/library.yaml",
38273827
"rabbitmq/src/tests/admin_rmqtest.cpp":"taxi/uservices/userver/rabbitmq/src/tests/admin_rmqtest.cpp",
3828+
"rabbitmq/src/tests/header_value_rmqtest.cpp":"taxi/uservices/userver/rabbitmq/src/tests/header_value_rmqtest.cpp",
38283829
"rabbitmq/src/tests/invalid_auth_rmqtest.cpp":"taxi/uservices/userver/rabbitmq/src/tests/invalid_auth_rmqtest.cpp",
38293830
"rabbitmq/src/tests/outage_rmqtest.cpp":"taxi/uservices/userver/rabbitmq/src/tests/outage_rmqtest.cpp",
38303831
"rabbitmq/src/tests/publish_consume_rmqtest.cpp":"taxi/uservices/userver/rabbitmq/src/tests/publish_consume_rmqtest.cpp",
@@ -3860,6 +3861,8 @@
38603861
"rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp",
38613862
"rabbitmq/src/urabbitmq/impl/deferred_wrapper.cpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/deferred_wrapper.cpp",
38623863
"rabbitmq/src/urabbitmq/impl/deferred_wrapper.hpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/deferred_wrapper.hpp",
3864+
"rabbitmq/src/urabbitmq/impl/header_value.cpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/header_value.cpp",
3865+
"rabbitmq/src/urabbitmq/impl/header_value.hpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/header_value.hpp",
38633866
"rabbitmq/src/urabbitmq/impl/io/socket_reader.cpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/io/socket_reader.cpp",
38643867
"rabbitmq/src/urabbitmq/impl/io/socket_reader.hpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/io/socket_reader.hpp",
38653868
"rabbitmq/src/urabbitmq/impl/response_awaiter.cpp":"taxi/uservices/userver/rabbitmq/src/urabbitmq/impl/response_awaiter.cpp",

rabbitmq/functional_tests/basic_chaos/rabbitmq_service.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <userver/components/component_context.hpp>
88
#include <userver/components/minimal_server_component_list.hpp>
99
#include <userver/concurrent/variable.hpp>
10+
#include <userver/formats/json.hpp>
11+
#include <userver/formats/parse/common_containers.hpp>
1012
#include <userver/formats/serialize/common_containers.hpp>
1113
#include <userver/server/handlers/http_handler_base.hpp>
1214
#include <userver/server/handlers/tests_control.hpp>
@@ -26,8 +28,7 @@ class ChaosProducer final : public components::LoggableComponentBase {
2628

2729
ChaosProducer(const components::ComponentConfig& config, const components::ComponentContext& context)
2830
: components::LoggableComponentBase{config, context},
29-
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()}
30-
{
31+
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()} {
3132
const auto setup_deadline = engine::Deadline::FromDuration(kDefaultOperationTimeout);
3233

3334
auto admin_channel = rabbit_client_->GetAdminChannel(setup_deadline);
@@ -77,9 +78,7 @@ class ChaosConsumer final : public components::ComponentBase {
7778
static constexpr std::string_view kName{"chaos-consumer"};
7879

7980
ChaosConsumer(const components::ComponentConfig& config, const components::ComponentContext& context)
80-
: components::ComponentBase{config, context},
81-
consumer_{config, context, messages_}
82-
{
81+
: components::ComponentBase{config, context}, consumer_{config, context, messages_} {
8382
Start();
8483
}
8584

@@ -119,8 +118,7 @@ class ChaosConsumer final : public components::ComponentBase {
119118
)
120119
: urabbitmq::
121120
ConsumerBase{context.FindComponent<components::RabbitMQ>(config["rabbit_name"].As<std::string>()).GetClient(), ParseSettings(config)},
122-
messages_{messages}
123-
{}
121+
messages_{messages} {}
124122

125123
protected:
126124
void Process(urabbitmq::ConsumedMessage msg) override {
@@ -150,8 +148,7 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
150148
ChaosHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
151149
: server::handlers::HttpHandlerBase{config, context},
152150
producer_{context.FindComponent<ChaosProducer>()},
153-
consumer_{context.FindComponent<ChaosConsumer>()}
154-
{}
151+
consumer_{context.FindComponent<ChaosConsumer>()} {}
155152

156153
std::string HandleRequestThrow(const server::http::HttpRequest& request, server::request::RequestContext&)
157154
const override {
@@ -178,6 +175,13 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
178175
throw server::handlers::ClientError{server::handlers::ExternalBody{"No 'message' query argument"}};
179176
}
180177
urabbitmq::Envelope envelope{message, urabbitmq::MessageType::kTransient, {}, {}, {}};
178+
if (!request.RequestBody().empty()) {
179+
const auto request_json = formats::json::FromString(request.RequestBody());
180+
if (request_json.HasMember("headers")) {
181+
envelope
182+
.headers = request_json["headers"].As<std::unordered_map<std::string, urabbitmq::HeaderValue>>();
183+
}
184+
}
181185
const auto& correlation_id = request.GetArg("correlation_id");
182186
if (!correlation_id.empty()) {
183187
envelope.correlation_id = correlation_id;
@@ -219,16 +223,17 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
219223
}
220224

221225
std::string HandleGet() const {
222-
formats::json::ValueBuilder messages_builder;
226+
urabbitmq::HeaderValue::Builder messages_builder;
223227
for (const auto& item : consumer_.GetMessages()) {
224-
formats::json::ValueBuilder item_builder;
228+
urabbitmq::HeaderValue::Builder item_builder;
225229
item_builder["message"] = item.message;
226230
if (item.correlation_id.has_value()) {
227231
item_builder["correlation_id"] = item.correlation_id;
228232
}
229233
if (item.reply_to.has_value()) {
230234
item_builder["reply_to"] = item.reply_to;
231235
}
236+
item_builder["headers"] = item.headers;
232237
messages_builder.PushBack(std::move(item_builder));
233238
}
234239
return formats::json::ToString(messages_builder.ExtractValue());

rabbitmq/functional_tests/basic_chaos/static_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ components_manager:
1717
min_pool_size: 1
1818
max_pool_size: 1
1919
max_in_flight_requests: 5
20+
heartbeat_interval_seconds: 1
2021
use_secure_connection: false
2122

2223
secdist: {} # Component that stores configuration of hosts and passwords

rabbitmq/functional_tests/basic_chaos/tests/test_rabbitmq.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ async def _clear_messages(service_client):
3737
assert response.status_code == 200
3838

3939

40+
def _strip_headers(messages):
41+
return [{key: value for key, value in message.items() if key != 'headers'} for message in messages]
42+
43+
4044
async def _publish_and_consume(testpoint, client):
4145
@testpoint('message_consumed')
4246
def message_consumed(data):
@@ -51,7 +55,7 @@ def message_consumed(data):
5155
response = await client.get('/v1/messages')
5256
assert response.status_code == 200
5357

54-
assert response.json() == MESSAGES
58+
assert _strip_headers(response.json()) == MESSAGES
5559

5660

5761
async def test_rabbitmq_happy(testpoint, service_client, gate):
@@ -60,6 +64,55 @@ async def test_rabbitmq_happy(testpoint, service_client, gate):
6064
await _publish_and_consume(testpoint, service_client)
6165

6266

67+
async def test_rabbitmq_headers(testpoint, service_client, gate):
68+
await _clear_messages(service_client)
69+
70+
@testpoint('message_consumed')
71+
def message_consumed(data):
72+
pass
73+
74+
expected_headers = {
75+
'x-bool': True,
76+
'x-int': -10,
77+
'x-uint': 10,
78+
'x-double': 2.5,
79+
'x-array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
80+
'x-object': {
81+
'count': 42,
82+
'name': 'nested-object',
83+
'array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
84+
},
85+
'x-null': None,
86+
}
87+
88+
response = await service_client.post(
89+
'/v1/messages?message=headers&reliable=1&reply_to=reply&correlation_id=corr-id',
90+
json={'headers': expected_headers},
91+
)
92+
assert response.status_code == 200
93+
94+
await message_consumed.wait_call()
95+
96+
response = await service_client.get('/v1/messages')
97+
assert response.status_code == 200
98+
messages = response.json()
99+
assert len(messages) == 1
100+
101+
consumed = messages[0]
102+
assert consumed['message'] == 'headers'
103+
assert consumed['reply_to'] == 'reply'
104+
assert consumed['correlation_id'] == 'corr-id'
105+
assert consumed['headers']['x-bool'] is True
106+
assert consumed['headers']['x-int'] == -10
107+
assert consumed['headers']['x-uint'] == 10
108+
assert consumed['headers']['x-double'] == 2.5
109+
assert consumed['headers']['x-array'] == expected_headers['x-array']
110+
assert consumed['headers']['x-object'] == expected_headers['x-object']
111+
assert consumed['headers']['x-null'] is None
112+
assert consumed['headers']['u-trace-id']
113+
assert consumed['headers']['u-parent-span-id']
114+
115+
63116
@pytest.mark.skip(reason='std::terminate is called, fix in TAXICOMMON-6995')
64117
async def test_consumer_reconnects(testpoint, service_client, gate):
65118
await _clear_messages(service_client)
@@ -85,4 +138,42 @@ def message_consumed(data):
85138
response = await service_client.get('/v1/messages')
86139
assert response.status_code == 200
87140

88-
assert response.json() == MESSAGES
141+
assert _strip_headers(response.json()) == MESSAGES
142+
143+
144+
async def test_rabbitmq_heartbeat_reconnects(testpoint, service_client, gate):
145+
await _clear_messages(service_client)
146+
147+
@testpoint('message_consumed')
148+
def message_consumed(data):
149+
pass
150+
151+
response = await service_client.post('/v1/messages?message=before-heartbeat')
152+
assert response.status_code == 200
153+
await message_consumed.wait_call()
154+
155+
async with service_client.capture_logs(log_level='INFO') as capture:
156+
157+
@capture.subscribe(text="Consumer for queue 'chaos-queue' is broken, trying to restart")
158+
def consumer_broken(**kwargs):
159+
pass
160+
161+
@capture.subscribe(text='Restarted successfully')
162+
def consumer_restarted(**kwargs):
163+
pass
164+
165+
await gate.to_server_drop()
166+
await asyncio.sleep(3.0)
167+
await gate.to_server_pass()
168+
169+
await consumer_broken.wait_call()
170+
await consumer_restarted.wait_call()
171+
172+
response = await service_client.post('/v1/messages?message=after-heartbeat')
173+
assert response.status_code == 200
174+
175+
await message_consumed.wait_call()
176+
177+
response = await service_client.get('/v1/messages')
178+
assert response.status_code == 200
179+
assert any(message['message'] == 'after-heartbeat' for message in response.json())

rabbitmq/include/userver/urabbitmq/client_settings.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <chrono>
34
#include <cstddef>
45
#include <optional>
56
#include <string>
@@ -74,6 +75,10 @@ struct PoolSettings final {
7475
/// (tcp error/protocol error/write timeout) leads to a errors burst:
7576
/// all outstanding request will fails at once
7677
size_t max_in_flight_requests = 5;
78+
79+
/// Requested AMQP heartbeat interval in seconds.
80+
/// Set to 0 to disable heartbeats.
81+
size_t heartbeat_interval_seconds = 60;
7782
};
7883

7984
class TestsHelper;

rabbitmq/include/userver/urabbitmq/typedefs.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
/// @brief Convenient typedefs for RabbitMQ entities.
55

66
#include <chrono>
7+
#include <cstdint>
8+
#include <optional>
9+
#include <string>
10+
#include <unordered_map>
711

12+
#include <userver/formats/json/value.hpp>
813
#include <userver/utils/strong_typedef.hpp>
914

1015
USERVER_NAMESPACE_BEGIN
@@ -63,6 +68,10 @@ enum class MessageType {
6368
kTransient,
6469
};
6570

71+
/// JSON-like representation of an AMQP header value.
72+
/// This is not JSON, but a convenient tree representation for AMQP field values.
73+
using HeaderValue = formats::json::Value;
74+
6675
/// @brief Structure holding an AMQP message body along with some of its
6776
/// metadata fields. This struct is used to pass messages to the end user,
6877
/// hiding the actual AMQP message object implementation.
@@ -75,6 +84,7 @@ struct ConsumedMessage {
7584
Metadata metadata;
7685
std::optional<std::string> reply_to{};
7786
std::optional<std::string> correlation_id{};
87+
std::unordered_map<std::string, HeaderValue> headers{};
7888
};
7989

8090
/// @brief Structure holding an AMQP message body along with some of its
@@ -86,6 +96,7 @@ struct Envelope {
8696
std::optional<std::string> reply_to{};
8797
std::optional<std::string> correlation_id{};
8898
std::optional<std::chrono::milliseconds> expiration{};
99+
std::optional<std::unordered_map<std::string, HeaderValue>> headers{};
89100
};
90101

91102
} // namespace urabbitmq

0 commit comments

Comments
 (0)