Skip to content

Commit 2f94c1e

Browse files
disaykinapolukhin
authored andcommitted
feat kafka: add bulk send producer method
Добавлен метод `Producer::Send` для синхронной отправки пачки сообщений в указанный топик Kafka. В случае ошибки отправки выкидывается исключение `BulkSendException`, которое содержит подробности об ошибках отправки сообщений из пачки. `Producer::Send` пытается обработать ошибки переполнения локальной очереди librdkafka в пределах настроенного `delivery.timeout.ms`. В отличие от старых методов `Send` и `SendAsync` метод порождает только одну корутину на всю пачку сообщений. Tests: протестировано CI --- Pull Request resolved: #1134 Co-authored-by: antoshkka <antoshkka@userver.tech> Co-authored-by: antoshkka <antoshkka@userver.tech> Co-authored-by: antoshkka <antoshkka@userver.tech> commit_hash:8c87611375eb1d0a8a8f7a67731e48d91f023beb
1 parent 6f67b33 commit 2f94c1e

File tree

10 files changed

+445
-68
lines changed

10 files changed

+445
-68
lines changed

.mapping.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2778,6 +2778,7 @@
27782778
"kafka/include/userver/kafka/impl/consumer.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/consumer.hpp",
27792779
"kafka/include/userver/kafka/impl/consumer_params.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/consumer_params.hpp",
27802780
"kafka/include/userver/kafka/impl/holders.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/holders.hpp",
2781+
"kafka/include/userver/kafka/impl/messages.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/messages.hpp",
27812782
"kafka/include/userver/kafka/impl/stats.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/stats.hpp",
27822783
"kafka/include/userver/kafka/message.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/message.hpp",
27832784
"kafka/include/userver/kafka/offset_range.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/offset_range.hpp",

kafka/include/userver/kafka/exceptions.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <exception>
5+
#include <map>
46
#include <stdexcept>
57
#include <string_view>
68

@@ -27,6 +29,29 @@ class SendException : public std::runtime_error {
2729
const bool is_retryable_{false};
2830
};
2931

32+
/// @brief Base exception thrown by Producer::Send in bulk mode
33+
/// in case of one or more send errors.
34+
class BulkSendException : public std::runtime_error {
35+
static constexpr const char* kWhat{"Some messages was not delivered."};
36+
37+
public:
38+
using ExceptionMap = std::map<std::size_t, std::exception_ptr>;
39+
40+
explicit BulkSendException(ExceptionMap nested_exceptions);
41+
42+
/// @return nested errors.
43+
/// Nested exceptions are subclasses of SendException.
44+
const ExceptionMap& GetExceptions() const noexcept;
45+
46+
private:
47+
/// @brief A mapping from the message's index in the bulk send operation
48+
/// to the exception that occurred during its delivering.
49+
/// @details Key: 0-based index of the element in the input batch.
50+
/// Value: Pointer to the exception.
51+
/// @note Contains only indices that resulted in an error.
52+
const ExceptionMap nested_exceptions_;
53+
};
54+
3055
class DeliveryTimeoutException final : public SendException {
3156
static constexpr const char* kWhat{
3257
"Message is not delivered after `delivery_timeout` milliseconds. Hint: "
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <string_view>
5+
#include <type_traits>
6+
7+
USERVER_NAMESPACE_BEGIN
8+
9+
namespace kafka::impl {
10+
11+
/// @brief Message collection adapter.
12+
class Messages {
13+
public:
14+
virtual std::size_t Size() const noexcept = 0;
15+
virtual std::string_view operator[](std::size_t index) const noexcept = 0;
16+
17+
protected:
18+
~Messages() = default;
19+
};
20+
21+
template <typename Container>
22+
class MessagesAdapter final : public Messages {
23+
public:
24+
static_assert(
25+
std::is_convertible_v<decltype(std::declval<const Container&>()[0]), std::string_view>,
26+
"Container must support operator[] and return a type convertible to std::string_view"
27+
);
28+
29+
static_assert(
30+
std::is_integral_v<decltype(std::declval<const Container&>().size())>,
31+
"Container must support method size() and return an integral type"
32+
);
33+
34+
explicit MessagesAdapter(const Container& data) noexcept : data_{data} {}
35+
36+
std::size_t Size() const noexcept override { return data_.size(); }
37+
std::string_view operator[](std::size_t index) const noexcept override { return data_[index]; }
38+
39+
private:
40+
const Container& data_;
41+
};
42+
43+
} // namespace kafka::impl
44+
45+
USERVER_NAMESPACE_END

kafka/include/userver/kafka/producer.hpp

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <type_traits>
5+
#include <utility>
46

57
#include <userver/engine/task/task_processor_fwd.hpp>
68
#include <userver/engine/task/task_with_result.hpp>
79
#include <userver/kafka/exceptions.hpp>
810
#include <userver/kafka/headers.hpp>
11+
#include <userver/kafka/impl/messages.hpp>
912
#include <userver/utils/fast_pimpl.hpp>
1013
#include <userver/utils/statistics/writer.hpp>
1114

@@ -81,26 +84,20 @@ class Producer final {
8184

8285
/// @brief Sends given message to topic `topic_name` by given `key`
8386
/// and `partition` (if passed) with payload contains the `message`
84-
/// data. Asynchronously waits until the message is delivered or the delivery
85-
/// error occurred.
87+
/// data. Asynchronously waits until the message is delivered or the delivery error occurred.
8688
///
87-
/// No payload data is copied. Method holds the data until message is
88-
/// delivered.
89+
/// No payload data is copied. Method holds the data until message is delivered.
8990
///
90-
/// Thread-safe and can be called from any number of threads
91-
/// concurrently.
91+
/// Thread-safe and can be called from any number of threads concurrently.
9292
///
93-
/// If `partition` not passed, partition is chosen by internal
94-
/// Kafka partitioner.
93+
/// If `partition` not passed, partition is chosen by internal Kafka partitioner.
9594
///
96-
/// @warning if `enable_idempotence` option is enabled, do not use both
97-
/// explicit partitions and Kafka-chosen ones.
95+
/// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
9896
///
9997
/// @throws SendException and its descendants if message is not delivered
10098
/// and acked by Kafka Broker in configured timeout.
10199
///
102-
/// @note Use SendException::IsRetryable method to understand whether there is
103-
/// a sense to retry the message sending.
100+
/// @note Use SendException::IsRetryable method to understand whether there is a sense to retry the message sending.
104101
/// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
105102
void Send(
106103
utils::zstring_view topic_name,
@@ -110,11 +107,40 @@ class Producer final {
110107
HeaderViews headers = {}
111108
) const;
112109

110+
/// @brief Sends given messages to topic `topic_name` by given `key`
111+
/// and `partition` (if passed) with payload contains the `messages`
112+
/// data. Asynchronously waits until the messages are delivered or the delivery error occurred.
113+
///
114+
/// No payload data is copied. Method holds the data until messages are delivered.
115+
///
116+
/// Thread-safe and can be called from any number of threads concurrently.
117+
///
118+
/// If `partition` not passed, partition is chosen by internal Kafka partitioner.
119+
///
120+
/// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
121+
///
122+
/// @throws BulkSendException if some messages was not delivered
123+
/// and acked by Kafka Broker in configured timeout.
124+
///
125+
/// @note Use BulkSendException::GetExceptions method to get a list of occured nested exceptions.
126+
template <typename Messages>
127+
std::enable_if_t<
128+
std::is_convertible_v<decltype(std::declval<const Messages&>()[0]), std::string_view> &&
129+
std::is_integral_v<decltype(std::declval<const Messages&>().size())> >
130+
Send(
131+
utils::zstring_view topic_name,
132+
std::string_view key,
133+
const Messages& messages,
134+
std::optional<std::uint32_t> partition = kUnassignedPartition,
135+
HeaderViews headers = {}
136+
) const {
137+
SendWrapper(topic_name, key, impl::MessagesAdapter{messages}, partition, std::move(headers));
138+
}
139+
113140
/// @brief Same as Producer::Send, but returns the task which can be
114141
/// used to wait the message delivery manually.
115142
///
116-
/// @warning If user schedules a batch of send requests with
117-
/// Producer::SendAsync, some send
143+
/// @warning If user schedules a batch of send requests with Producer::SendAsync, some send
118144
/// requests may be retried by the library (for instance, in case of network
119145
/// blink). Though, the order messages are written to partition may differ
120146
/// from the order messages are initially sent
@@ -141,6 +167,22 @@ class Producer final {
141167
impl::HeadersHolder&& headers_holder
142168
) const;
143169

170+
void SendImpl(
171+
utils::zstring_view topic_name,
172+
std::string_view key,
173+
const impl::Messages& messages,
174+
std::optional<std::uint32_t> partition,
175+
std::vector<impl::HeadersHolder>&& headers_holders
176+
) const;
177+
178+
void SendWrapper(
179+
utils::zstring_view topic_name,
180+
std::string_view key,
181+
const impl::Messages& messages,
182+
std::optional<std::uint32_t> partition,
183+
HeaderViews headers
184+
) const;
185+
144186
const std::string name_;
145187
engine::TaskProcessor& producer_task_processor_;
146188

kafka/src/kafka/exceptions.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ SendException::SendException(const char* what, bool is_retryable)
1313
is_retryable_(is_retryable)
1414
{}
1515

16+
BulkSendException::BulkSendException(BulkSendException::ExceptionMap nested_exceptions)
17+
: std::runtime_error(kWhat),
18+
nested_exceptions_(std::move(nested_exceptions))
19+
{}
20+
21+
const BulkSendException::ExceptionMap& BulkSendException::GetExceptions() const noexcept { return nested_exceptions_; }
22+
1623
DeliveryTimeoutException::DeliveryTimeoutException()
1724
: SendException(kWhat, /*is_retryable=*/true)
1825
{}

kafka/src/kafka/impl/producer_impl.cpp

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,56 @@ DeliveryResult ProducerImpl::Send(
138138
HeadersHolder headers_holder
139139
) const {
140140
LOG(operation_log_level_) << fmt::format("Message to topic '{}' is requested to send", topic_name);
141+
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
141142
auto delivery_result_future =
142-
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder));
143+
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder), deadline);
143144

144145
WaitUntilDeliveryReported(delivery_result_future);
145146

146147
return delivery_result_future.get();
147148
}
148149

150+
std::vector<DeliveryResult> ProducerImpl::Send(
151+
utils::zstring_view topic_name,
152+
std::string_view key,
153+
const Messages& messages,
154+
std::optional<std::uint32_t> partition,
155+
std::vector<HeadersHolder> headers_holders
156+
) const {
157+
UASSERT(messages.Size() == headers_holders.size());
158+
159+
LOG(operation_log_level_
160+
) << fmt::format("Messages {} to topic '{}' are requested to send", messages.Size(), topic_name);
161+
162+
std::vector<engine::Future<DeliveryResult>> delivery_result_futures;
163+
delivery_result_futures.reserve(messages.Size());
164+
165+
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
166+
for (std::size_t i = 0; i < messages.Size(); ++i) {
167+
delivery_result_futures.emplace_back(
168+
ScheduleMessageDelivery(topic_name, key, messages[i], partition, std::move(headers_holders[i]), deadline)
169+
);
170+
}
171+
172+
std::vector<DeliveryResult> delivery_results;
173+
delivery_results.reserve(messages.Size());
174+
175+
for (auto& delivery_result_future : delivery_result_futures) {
176+
WaitUntilDeliveryReported(delivery_result_future);
177+
178+
delivery_results.emplace_back(delivery_result_future.get());
179+
}
180+
181+
return delivery_results;
182+
}
183+
149184
engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
150185
utils::zstring_view topic_name,
151186
std::string_view key,
152187
std::string_view message,
153188
std::optional<std::uint32_t> partition,
154-
HeadersHolder headers_holder
189+
HeadersHolder headers_holder,
190+
engine::Deadline deadline
155191
) const {
156192
auto waiter = std::make_unique<DeliveryWaiter>();
157193
auto wait_handle = waiter->GetFuture();
@@ -181,34 +217,42 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
181217
///
182218
/// Headers holder **must** be released if `rd_kafka_producev` succeeded.
183219

220+
while (!deadline.IsReached() && !engine::current_task::ShouldCancel()) {
184221
#ifdef __clang__
185222
#pragma clang diagnostic push
186223
#pragma clang diagnostic ignored "-Wgnu-statement-expression"
187224
#endif
188-
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks,cppcoreguidelines-pro-type-const-cast)
189-
const rd_kafka_resp_err_t enqueue_error = rd_kafka_producev(
190-
producer_.GetHandle(),
191-
RD_KAFKA_V_TOPIC(topic_name.c_str()),
192-
RD_KAFKA_V_KEY(key.data(), key.size()),
193-
RD_KAFKA_V_VALUE(const_cast<char*>(message.data()), message.size()),
194-
RD_KAFKA_V_MSGFLAGS(0),
195-
RD_KAFKA_V_HEADERS(headers_holder.GetHandle()),
196-
RD_KAFKA_V_PARTITION(partition.value_or(RD_KAFKA_PARTITION_UA)),
197-
RD_KAFKA_V_OPAQUE(waiter.get()),
198-
RD_KAFKA_V_END
199-
);
200-
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks,cppcoreguidelines-pro-type-const-cast)
225+
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks,cppcoreguidelines-pro-type-const-cast)
226+
const rd_kafka_resp_err_t enqueue_error = rd_kafka_producev(
227+
producer_.GetHandle(),
228+
RD_KAFKA_V_TOPIC(topic_name.c_str()),
229+
RD_KAFKA_V_KEY(key.data(), key.size()),
230+
RD_KAFKA_V_VALUE(const_cast<char*>(message.data()), message.size()),
231+
RD_KAFKA_V_MSGFLAGS(0),
232+
RD_KAFKA_V_HEADERS(headers_holder.GetHandle()),
233+
RD_KAFKA_V_PARTITION(partition.value_or(RD_KAFKA_PARTITION_UA)),
234+
RD_KAFKA_V_OPAQUE(waiter.get()),
235+
RD_KAFKA_V_END
236+
);
237+
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks,cppcoreguidelines-pro-type-const-cast)
201238

202239
#ifdef __clang__
203240
#pragma clang diagnostic pop
204241
#endif
205242

206-
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
207-
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
208-
[[maybe_unused]] const auto waiter_ptr = waiter.release();
209-
} else {
210-
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
211-
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
243+
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
244+
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
245+
[[maybe_unused]] const auto waiter_ptr = waiter.release();
246+
} else if (enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
247+
LOG_LIMITED_WARNING("Kafka local queue is full");
248+
/// waiting for a while for the queue to clear up
249+
engine::Yield();
250+
continue;
251+
} else {
252+
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
253+
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
254+
}
255+
break;
212256
}
213257

214258
return wait_handle;

kafka/src/kafka/impl/producer_impl.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
#include <librdkafka/rdkafka.h>
88

9+
#include <userver/engine/deadline.hpp>
10+
#include <userver/kafka/impl/messages.hpp>
911
#include <userver/kafka/impl/stats.hpp>
1012
#include <userver/logging/level.hpp>
1113
#include <userver/utils/periodic_task.hpp>
@@ -40,6 +42,16 @@ class ProducerImpl final {
4042
HeadersHolder headers
4143
) const;
4244

45+
/// @brief Send messages and waits for its delivery.
46+
/// While waiting handles other messages delivery reports, errors and logs.
47+
[[nodiscard]] std::vector<DeliveryResult> Send(
48+
utils::zstring_view topic_name,
49+
std::string_view key,
50+
const Messages& messages,
51+
std::optional<std::uint32_t> partition,
52+
std::vector<HeadersHolder> headers
53+
) const;
54+
4355
/// @brief Waits until scheduled messages are delivered for
4456
/// at most 2 x `delivery_timeout`.
4557
///
@@ -58,7 +70,8 @@ class ProducerImpl final {
5870
std::string_view key,
5971
std::string_view message,
6072
std::optional<std::uint32_t> partition,
61-
HeadersHolder headers
73+
HeadersHolder headers,
74+
engine::Deadline deadline
6275
) const;
6376

6477
/// @brief Poll a delivery or error event from producer's queue.

0 commit comments

Comments
 (0)