Skip to content

Commit 5b8bee2

Browse files
author
aiguzhin
committed
feat ydb: get read session events in transaction
Relates:(https://nda.ya.ru/t/3Pf5rcwe7YMqT4 commit_hash:c9a8b34012a8e02410f6ca9da79aa2e37b84d836
1 parent a2298a2 commit 5b8bee2

4 files changed

Lines changed: 25 additions & 0 deletions

File tree

ydb/include/userver/ydb/topic.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ class TopicReadSession final {
4747
size_t max_size_bytes = std::numeric_limits<size_t>::max()
4848
);
4949

50+
/// @brief Get read session events
51+
///
52+
/// Waits until event occurs
53+
/// @param settings ydb native read session settings
54+
std::vector<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvents(
55+
const NYdb::NTopic::TReadSessionGetEventSettings& settings
56+
);
57+
5058
/// @brief Close read session
5159
///
5260
/// Waits for all commit acknowledgments to arrive.

ydb/include/userver/ydb/transaction.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ class Transaction final {
7878
) noexcept;
7979
/// @endcond
8080

81+
/// Get native transaction
82+
/// @warning Use with care! Facilities from
83+
/// `<core/include/userver/drivers/subscribable_futures.hpp>` can help with
84+
/// non-blocking wait operations.
85+
NYdb::TTransactionBase& GetNativeTransaction();
86+
8187
private:
8288
void MarkError() noexcept;
8389
auto ErrorGuard();

ydb/src/ydb/topic.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ std::vector<NYdb::NTopic::TReadSessionEvent::TEvent> TopicReadSession::GetEvents
2525
return read_session_->GetEvents(false, max_events_count, max_size_bytes);
2626
}
2727

28+
std::vector<NYdb::NTopic::TReadSessionEvent::TEvent> TopicReadSession::GetEvents(
29+
const NYdb::NTopic::TReadSessionGetEventSettings& settings
30+
) {
31+
impl::GetFutureValue(read_session_->WaitEvent());
32+
return read_session_->GetEvents(settings);
33+
}
34+
2835
bool TopicReadSession::Close(std::chrono::milliseconds timeout) { return read_session_->Close(timeout); }
2936

3037
std::shared_ptr<NYdb::NTopic::IReadSession> TopicReadSession::GetNativeTopicReadSession() { return read_session_; }

ydb/src/ydb/transaction.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@ ExecuteResponse Transaction::Execute(
221221
return ExecuteResponse(std::move(status));
222222
}
223223

224+
NYdb::TTransactionBase& Transaction::GetNativeTransaction() {
225+
return std::visit([](auto& tx) -> NYdb::TTransactionBase& { return tx; }, ydb_tx_);
226+
}
227+
224228
} // namespace ydb
225229

226230
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)