Skip to content

Commit b9a8625

Browse files
author
brgayazov
committed
feat ydb: support retries for interactive tx
Supported @ref ydb::TableClient::RetryTx handle for execution lambda in a transaction with automatic retries. Transaction lambda accepts @ref ydb:TxActor as an argument. A return object is @ref ydb::TxAction enum, an action at end of transcaction, commit or rollback. @ref ydb:TxActor allows to execute request inside of interactive transaction via @ref ydb:TxActor::Execute handle. An example of usage of @ref ydb::TableClient::RetryTx: @code{.cpp} client.RetryTx("my_tx", {.retries = 3}, [](ydb::TxActor& tx) { tx.Execute(query, "$id", 1); return ydb::TxAction::kCommit; }); @Endcode commit_hash:d09b5ecf9d92850f15c3f918b059716c27793c76
1 parent e995f60 commit b9a8625

File tree

30 files changed

+1205
-164
lines changed

30 files changed

+1205
-164
lines changed

.mapping.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5990,6 +5990,8 @@
59905990
"ydb/functional_tests/basic/views/select-list/post/view.hpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/select-list/post/view.hpp",
59915991
"ydb/functional_tests/basic/views/select-rows/post/view.cpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/select-rows/post/view.cpp",
59925992
"ydb/functional_tests/basic/views/select-rows/post/view.hpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/select-rows/post/view.hpp",
5993+
"ydb/functional_tests/basic/views/upsert-row-old/post/view.cpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/upsert-row-old/post/view.cpp",
5994+
"ydb/functional_tests/basic/views/upsert-row-old/post/view.hpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/upsert-row-old/post/view.hpp",
59935995
"ydb/functional_tests/basic/views/upsert-row/post/view.cpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/upsert-row/post/view.cpp",
59945996
"ydb/functional_tests/basic/views/upsert-row/post/view.hpp":"taxi/uservices/userver/ydb/functional_tests/basic/views/upsert-row/post/view.hpp",
59955997
"ydb/functional_tests/basic/ydb/schemas/events.yaml":"taxi/uservices/userver/ydb/functional_tests/basic/ydb/schemas/events.yaml",
@@ -6048,6 +6050,8 @@
60486050
"ydb/src/ydb/impl/request_context.hpp":"taxi/uservices/userver/ydb/src/ydb/impl/request_context.hpp",
60496051
"ydb/src/ydb/impl/retry.cpp":"taxi/uservices/userver/ydb/src/ydb/impl/retry.cpp",
60506052
"ydb/src/ydb/impl/retry.hpp":"taxi/uservices/userver/ydb/src/ydb/impl/retry.hpp",
6053+
"ydb/src/ydb/impl/retry_tx.cpp":"taxi/uservices/userver/ydb/src/ydb/impl/retry_tx.cpp",
6054+
"ydb/src/ydb/impl/retry_tx.hpp":"taxi/uservices/userver/ydb/src/ydb/impl/retry_tx.hpp",
60516055
"ydb/src/ydb/impl/secdist.cpp":"taxi/uservices/userver/ydb/src/ydb/impl/secdist.cpp",
60526056
"ydb/src/ydb/impl/secdist.hpp":"taxi/uservices/userver/ydb/src/ydb/impl/secdist.hpp",
60536057
"ydb/src/ydb/impl/stats.cpp":"taxi/uservices/userver/ydb/src/ydb/impl/stats.cpp",
@@ -6068,6 +6072,7 @@
60686072
"ydb/tests/list_test.cpp":"taxi/uservices/userver/ydb/tests/list_test.cpp",
60696073
"ydb/tests/parallel_data_query_test.cpp":"taxi/uservices/userver/ydb/tests/parallel_data_query_test.cpp",
60706074
"ydb/tests/retry_test.cpp":"taxi/uservices/userver/ydb/tests/retry_test.cpp",
6075+
"ydb/tests/retry_tx_test.cpp":"taxi/uservices/userver/ydb/tests/retry_tx_test.cpp",
60716076
"ydb/tests/scan_table_test.cpp":"taxi/uservices/userver/ydb/tests/scan_table_test.cpp",
60726077
"ydb/tests/schema_test.cpp":"taxi/uservices/userver/ydb/tests/schema_test.cpp",
60736078
"ydb/tests/small_table.hpp":"taxi/uservices/userver/ydb/tests/small_table.hpp",

samples/ydb_service/views/upsert-2rows/post/view.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,29 @@ VALUES ($id_key, $name_key, $service_key, $channel_key, CurrentUtcTimestamp(), $
2727
ydb::Query::LogMode::kNameOnly,
2828
};
2929

30-
auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW);
31-
32-
for (auto i : {1, 2}) {
33-
auto response = trx.Execute(
34-
kUpsertQuery, //
35-
"$id_key",
36-
request["id"].As<std::string>() + std::to_string(i), //
37-
"$name_key",
38-
ydb::Utf8{request["name"].As<std::string>() + std::to_string(i)}, //
39-
"$service_key",
40-
request["service"].As<std::string>(), //
41-
"$channel_key",
42-
request["channel"].As<int64_t>(), //
43-
"$state_key",
44-
request["state"].As<std::optional<formats::json::Value>>() //
45-
);
46-
47-
if (response.GetCursorCount() != 0) {
48-
throw std::runtime_error("Unexpected response data");
30+
Ydb().RetryTx("trx", {.tx_mode = ydb::TransactionMode::kSerializableRW}, [&](ydb::TxActor& tx) {
31+
for (auto i : {1, 2}) {
32+
auto response = tx.Execute(
33+
kUpsertQuery, //
34+
"$id_key",
35+
request["id"].As<std::string>() + std::to_string(i), //
36+
"$name_key",
37+
ydb::Utf8{request["name"].As<std::string>() + std::to_string(i)}, //
38+
"$service_key",
39+
request["service"].As<std::string>(), //
40+
"$channel_key",
41+
request["channel"].As<int64_t>(), //
42+
"$state_key",
43+
request["state"].As<std::optional<formats::json::Value>>() //
44+
);
45+
46+
if (response.GetCursorCount() != 0) {
47+
throw std::runtime_error("Unexpected response data");
48+
}
4949
}
50-
}
5150

52-
trx.Commit();
51+
return ydb::TxAction::kCommit;
52+
});
5353

5454
return formats::json::MakeObject();
5555
}

ydb/functional_tests/basic/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ project(userver-ydb-tests-basic CXX)
22

33
add_executable(
44
${PROJECT_NAME} "ydb_service.cpp" "views/describe-table/post/view.cpp" "views/select-list/post/view.cpp"
5-
"views/select-rows/post/view.cpp" "views/upsert-row/post/view.cpp"
5+
"views/select-rows/post/view.cpp" "views/upsert-row/post/view.cpp" "views/upsert-row-old/post/view.cpp""
66
)
77
target_link_libraries(${PROJECT_NAME} userver::ydb)
88
target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})

ydb/functional_tests/basic/static_config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ components_manager:
9191
method: POST
9292
path: /ydb/upsert-row
9393
task_processor: main-task-processor
94+
handler-upsert-row-old:
95+
method: POST
96+
path: /ydb/upsert-row-old
97+
task_processor: main-task-processor
9498
handler-describe-table:
9599
method: POST
96100
path: /ydb/describe-table

ydb/functional_tests/basic/tests-metrics/static/metrics_values.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,32 @@ distlock.task-failures: distlock_name=sample-dist-lock RATE 0
88
distlock.task-successes: distlock_name=sample-dist-lock RATE 0
99
ydb.by-query.cancelled: ydb_database=sampledb, ydb_query=Begin RATE 0
1010
ydb.by-query.cancelled: ydb_database=sampledb, ydb_query=Commit RATE 0
11+
ydb.by-query.cancelled: ydb_database=sampledb, ydb_query=GetSession RATE 0
1112
ydb.by-query.cancelled: ydb_database=sampledb, ydb_query=UNNAMED RATE 0
1213
ydb.by-query.cancelled: ydb_database=sampledb, ydb_query=upsert-row RATE 0
1314
ydb.by-query.error: ydb_database=sampledb, ydb_query=Begin RATE 0
1415
ydb.by-query.error: ydb_database=sampledb, ydb_query=Commit RATE 0
16+
ydb.by-query.error: ydb_database=sampledb, ydb_query=GetSession RATE 0
1517
ydb.by-query.error: ydb_database=sampledb, ydb_query=UNNAMED RATE 0
1618
ydb.by-query.error: ydb_database=sampledb, ydb_query=upsert-row RATE 0
1719
ydb.by-query.success: ydb_database=sampledb, ydb_query=Begin RATE 3
1820
ydb.by-query.success: ydb_database=sampledb, ydb_query=Commit RATE 3
21+
ydb.by-query.success: ydb_database=sampledb, ydb_query=GetSession RATE 3
1922
ydb.by-query.success: ydb_database=sampledb, ydb_query=UNNAMED RATE 1
2023
ydb.by-query.success: ydb_database=sampledb, ydb_query=upsert-row RATE 3
2124
ydb.by-query.timings: ydb_database=sampledb, ydb_query=Begin HIST_RATE [5]=3,[10]=0,[20]=0,[35]=0,[60]=0,[100]=0,[173]=0,[300]=0,[520]=0,[1000]=0,[3200]=0,[10000]=0,[32000]=0,[100000]=0,[inf]=0
2225
ydb.by-query.timings: ydb_database=sampledb, ydb_query=Commit HIST_RATE [5]=3,[10]=0,[20]=0,[35]=0,[60]=0,[100]=0,[173]=0,[300]=0,[520]=0,[1000]=0,[3200]=0,[10000]=0,[32000]=0,[100000]=0,[inf]=0
26+
ydb.by-query.timings: ydb_database=sampledb, ydb_query=GetSession HIST_RATE [5]=3,[10]=0,[20]=0,[35]=0,[60]=0,[100]=0,[173]=0,[300]=0,[520]=0,[1000]=0,[3200]=0,[10000]=0,[32000]=0,[100000]=0,[inf]=0
2327
ydb.by-query.timings: ydb_database=sampledb, ydb_query=UNNAMED HIST_RATE [5]=0,[10]=0,[20]=0,[35]=0,[60]=0,[100]=1,[173]=0,[300]=0,[520]=0,[1000]=0,[3200]=0,[10000]=0,[32000]=0,[100000]=0,[inf]=0
2428
ydb.by-query.timings: ydb_database=sampledb, ydb_query=upsert-row HIST_RATE [5]=3,[10]=0,[20]=0,[35]=0,[60]=0,[100]=0,[173]=0,[300]=0,[520]=0,[1000]=0,[3200]=0,[10000]=0,[32000]=0,[100000]=0,[inf]=0
2529
ydb.by-query.total: ydb_database=sampledb, ydb_query=Begin RATE 3
2630
ydb.by-query.total: ydb_database=sampledb, ydb_query=Commit RATE 3
31+
ydb.by-query.total: ydb_database=sampledb, ydb_query=GetSession RATE 3
2732
ydb.by-query.total: ydb_database=sampledb, ydb_query=UNNAMED RATE 1
2833
ydb.by-query.total: ydb_database=sampledb, ydb_query=upsert-row RATE 3
2934
ydb.by-query.transport-error: ydb_database=sampledb, ydb_query=Begin RATE 0
3035
ydb.by-query.transport-error: ydb_database=sampledb, ydb_query=Commit RATE 0
36+
ydb.by-query.transport-error: ydb_database=sampledb, ydb_query=GetSession RATE 0
3137
ydb.by-query.transport-error: ydb_database=sampledb, ydb_query=UNNAMED RATE 0
3238
ydb.by-query.transport-error: ydb_database=sampledb, ydb_query=upsert-row RATE 0
3339
ydb.by-transaction.cancelled: ydb_database=sampledb, ydb_transaction=trx RATE 0

ydb/functional_tests/basic/tests/test_deadline_propagation.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
'channel': 123,
1111
}
1212

13-
QUERY_NAMES = ('Begin', 'upsert-row', 'Commit')
13+
QUERY_NAMES = ('GetSession', 'Begin', 'upsert-row', 'Commit')
14+
QUERY_NAMES_OLD = ('Begin', 'upsert-row', 'Commit')
1415

1516

1617
def assert_deadline_timeout(
1718
capture,
1819
*,
19-
query_names=QUERY_NAMES,
20+
query_names,
2021
expect_dp_enabled: bool = True,
2122
):
2223
for query in query_names:
@@ -28,46 +29,58 @@ def assert_deadline_timeout(
2829
assert 'deadline_timeout_ms' not in logs[0]
2930

3031

31-
async def test_on(service_client):
32+
def get_query_names(handler):
33+
return QUERY_NAMES if handler == 'upsert-row' else QUERY_NAMES_OLD
34+
35+
36+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
37+
async def test_on(service_client, handler):
3238
async with service_client.capture_logs() as capture:
3339
response = await service_client.post(
34-
'ydb/upsert-row',
40+
f'ydb/{handler}',
3541
headers={DP_TIMEOUT_MS: TIMEOUT},
3642
json=JSON,
3743
)
3844
assert response.status_code == 200
3945

40-
assert_deadline_timeout(capture)
46+
assert_deadline_timeout(capture, query_names=get_query_names(handler))
4147

4248

43-
async def test_triggered(service_client):
49+
@pytest.mark.parametrize('handler', ['upsert-row-old'])
50+
async def test_triggered_old(service_client, handler):
4451
async with service_client.capture_logs() as capture:
4552
response = await service_client.post(
46-
'ydb/upsert-row',
53+
f'ydb/{handler}',
4754
headers={DP_TIMEOUT_MS: '5'},
4855
json=JSON,
4956
)
5057
assert response.status_code == 498
5158

52-
assert_deadline_timeout(capture, query_names=['Begin'])
59+
if handler == 'upsert-row-old':
60+
assert_deadline_timeout(capture, query_names=['Begin'])
61+
assert len(capture.select(stopwatch_name='ydb_query')) == 1
62+
else:
63+
assert len(capture.select(stopwatch_name='ydb_query')) == 0
5364

5465

5566
@pytest.mark.config(YDB_DEADLINE_PROPAGATION_VERSION=0)
56-
async def test_config_disabled(service_client):
67+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
68+
async def test_config_disabled(service_client, handler):
5769
async with service_client.capture_logs() as capture:
5870
response = await service_client.post(
59-
'ydb/upsert-row',
71+
f'ydb/{handler}',
6072
headers={DP_TIMEOUT_MS: TIMEOUT},
6173
json=JSON,
6274
)
6375
assert response.status_code == 200
6476

65-
assert_deadline_timeout(capture, expect_dp_enabled=False)
77+
assert_deadline_timeout(capture, query_names=get_query_names(handler), expect_dp_enabled=False)
6678

6779

68-
async def test_off(service_client):
80+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
81+
async def test_off(service_client, handler):
6982
async with service_client.capture_logs() as capture:
70-
response = await service_client.post('ydb/upsert-row', json=JSON)
83+
response = await service_client.post(f'ydb/{handler}', json=JSON)
7184
assert response.status_code == 200
7285

73-
assert_deadline_timeout(capture, expect_dp_enabled=False)
86+
assert_deadline_timeout(capture, query_names=get_query_names(handler), expect_dp_enabled=False)

ydb/functional_tests/basic/tests/test_operation_settings.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import pytest
2+
3+
14
async def test_get_settings_from_user_code(service_client):
25
async with service_client.capture_logs() as capture:
36
response = await service_client.post(
@@ -20,10 +23,11 @@ async def test_get_settings_from_user_code(service_client):
2023
)
2124

2225

23-
async def test_get_settings_from_static_config(service_client):
26+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
27+
async def test_get_settings_from_static_config(service_client, handler):
2428
async with service_client.capture_logs() as capture:
2529
response = await service_client.post(
26-
'ydb/upsert-row',
30+
f'ydb/{handler}',
2731
json={
2832
'id': 'id-upsert',
2933
'name': 'name-upsert',
@@ -34,13 +38,21 @@ async def test_get_settings_from_static_config(service_client):
3438
assert response.status_code == 200
3539
assert response.json() == {}
3640

37-
assert capture.select(
38-
link=response.headers['x-yarequestid'],
39-
stopwatch_name='ydb_query',
40-
max_retries='2',
41-
get_session_timeout_ms='5001',
42-
client_timeout_ms='1101',
43-
)
41+
if handler == 'upsert-row':
42+
assert capture.select(
43+
link=response.headers['x-yarequestid'],
44+
stopwatch_name='ydb_query',
45+
max_retries='2',
46+
client_timeout_ms='1101',
47+
)
48+
else:
49+
assert capture.select(
50+
link=response.headers['x-yarequestid'],
51+
stopwatch_name='ydb_query',
52+
max_retries='2',
53+
get_session_timeout_ms='5001',
54+
client_timeout_ms='1101',
55+
)
4456

4557

4658
async def test_get_settings_from_dynamic_config(

ydb/functional_tests/basic/tests/test_upsert.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
async def test_upsert_row(service_client, ydb):
1+
import pytest
2+
3+
4+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
5+
async def test_upsert_row(service_client, ydb, handler):
26
response = await service_client.post(
3-
'ydb/upsert-row',
7+
f'ydb/{handler}',
48
json={
59
'id': 'id-upsert',
610
'name': 'name-upsert',
@@ -26,10 +30,11 @@ async def test_upsert_row(service_client, ydb):
2630
}
2731

2832

29-
async def test_trx_force_failure(service_client, ydb, userver_ydb_trx):
33+
@pytest.mark.parametrize('handler', ['upsert-row', 'upsert-row-old'])
34+
async def test_trx_force_failure(service_client, ydb, userver_ydb_trx, handler):
3035
userver_ydb_trx.enable_failure('trx')
3136
response = await service_client.post(
32-
'ydb/upsert-row',
37+
f'ydb/{handler}',
3338
json={
3439
'id': 'id-upsert',
3540
'name': 'name-upsert',
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include "view.hpp"
2+
3+
#include <userver/engine/sleep.hpp>
4+
#include <userver/formats/json.hpp>
5+
#include <userver/logging/log.hpp>
6+
#include <userver/utest/using_namespace_userver.hpp>
7+
#include <userver/utils/datetime.hpp>
8+
9+
#include <userver/ydb/table.hpp>
10+
11+
namespace {
12+
13+
const ydb::Query kUpsertQuery{
14+
R"(
15+
--!syntax_v1
16+
DECLARE $id_key AS String;
17+
DECLARE $name_key AS Utf8;
18+
DECLARE $service_key AS String;
19+
DECLARE $channel_key AS Int64;
20+
DECLARE $state_key AS Json?;
21+
22+
UPSERT INTO events (id, name, service, channel, created, state)
23+
VALUES ($id_key, $name_key, $service_key, $channel_key, CurrentUtcTimestamp(), $state_key);
24+
)",
25+
ydb::Query::Name{"upsert-row"},
26+
};
27+
28+
} // namespace
29+
30+
namespace sample {
31+
32+
formats::json::
33+
Value
34+
UpsertRowOldHandler::
35+
HandleRequestJsonThrow(const server::http::HttpRequest&, const formats::json::Value& request, server::request::RequestContext&) const {
36+
engine::SleepFor(std::chrono::milliseconds(10));
37+
38+
auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW);
39+
auto response = trx.Execute(
40+
kUpsertQuery, //
41+
"$id_key",
42+
request["id"].As<std::string>(), //
43+
"$name_key",
44+
ydb::Utf8{request["name"].As<std::string>()}, //
45+
"$service_key",
46+
request["service"].As<std::string>(), //
47+
"$channel_key",
48+
request["channel"].As<int64_t>(), //
49+
"$state_key",
50+
request["state"].As<std::optional<formats::json::Value>>() //
51+
);
52+
53+
if (response.GetCursorCount()) {
54+
throw std::runtime_error("Unexpected response data");
55+
}
56+
57+
trx.Commit();
58+
59+
return formats::json::MakeObject();
60+
}
61+
62+
} // namespace sample
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#pragma once
2+
3+
#include <views/base_handler.hpp>
4+
5+
namespace sample {
6+
7+
class UpsertRowOldHandler final : public BaseHandler {
8+
public:
9+
static constexpr std::string_view kName = "handler-upsert-row-old";
10+
11+
using BaseHandler::BaseHandler;
12+
13+
formats::json::Value HandleRequestJsonThrow(
14+
const server::http::HttpRequest& request,
15+
const formats::json::Value& request_json,
16+
server::request::RequestContext& context
17+
) const override;
18+
};
19+
20+
} // namespace sample

0 commit comments

Comments
 (0)