Skip to content

Commit 8b49570

Browse files
committed
feat kafka: retry flapping check
Tests: протестировано CI commit_hash:ab4bb4aea109a213f57b145901b1f908ac852095
1 parent f23af2b commit 8b49570

1 file changed

Lines changed: 27 additions & 19 deletions

File tree

kafka/tests/producer_kafkatest.cpp

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -220,31 +220,39 @@ UTEST_F(ProducerTest, MessageTimeout) {
220220
producer_configuration.queue_buffering_max = std::chrono::milliseconds{0};
221221

222222
auto producer = MakeProducer("kafka-producer", producer_configuration);
223-
const std::string topic = GenerateTopic();
224223

225-
/// [Producer retryable error]
226-
std::vector<engine::TaskWithResult<void>> results;
227-
results.reserve(kMaxQueueMessages);
228-
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
229-
results.push_back(producer.SendAsync(topic, fmt::format("test-key-{}", send), fmt::format("test-{}", send)));
230-
}
224+
constexpr std::size_t kMaxRetries = 10;
225+
for (std::size_t i = 0; i < kMaxRetries; ++i) {
226+
const std::string topic = GenerateTopic();
231227

232-
std::vector<std::uint32_t> sends_to_retry;
233-
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
234-
try {
235-
results[send].Get();
236-
} catch (const kafka::SendException& e) {
237-
if (e.IsRetryable()) {
238-
// Probabl issues with network and reached `delivery_timeout`, retry
239-
sends_to_retry.push_back(send);
240-
} else {
241-
// LOG ...
228+
/// [Producer retryable error]
229+
std::vector<engine::TaskWithResult<void>> results;
230+
results.reserve(kMaxQueueMessages);
231+
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
232+
results.push_back(producer.SendAsync(topic, fmt::format("test-key-{}", send), std::to_string(send)));
233+
}
234+
235+
std::vector<std::uint32_t> sends_to_retry;
236+
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
237+
try {
238+
results[send].Get();
239+
} catch (const kafka::SendException& e) {
240+
if (e.IsRetryable()) {
241+
// Probabl issues with network and reached `delivery_timeout`, retry
242+
sends_to_retry.push_back(send);
243+
} else {
244+
// LOG ...
245+
}
242246
}
243247
}
248+
/// [Producer retryable error]
249+
250+
if (!sends_to_retry.empty()) {
251+
return;
252+
}
244253
}
245-
/// [Producer retryable error]
246254

247-
EXPECT_GT(sends_to_retry.size(), 0); // remove the check if the CI is too fast and the check flaps in tests
255+
FAIL() << "Failed to trigger failures after " << kMaxRetries << " retries.";
248256
}
249257

250258
UTEST_F(ProducerTest, FullQueueBulk) {

0 commit comments

Comments
 (0)