Skip to content

Commit 836045c

Browse files
committed
feat core: read abs tp from the new header field
commit_hash:46ecc8318a38e8c919fc4d43b58fcb2d1bc7b5a4
1 parent d050929 commit 836045c

7 files changed

Lines changed: 221 additions & 9 deletions

File tree

.mapping.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,8 @@
652652
"core/dynamic_configs/USERVER_BAGGAGE_ENABLED.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_BAGGAGE_ENABLED.yaml",
653653
"core/dynamic_configs/USERVER_CACHES.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_CACHES.yaml",
654654
"core/dynamic_configs/USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE.yaml",
655+
"core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED.yaml",
656+
"core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS.yaml",
655657
"core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_ENABLED.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_DEADLINE_PROPAGATION_ENABLED.yaml",
656658
"core/dynamic_configs/USERVER_DUMPS.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_DUMPS.yaml",
657659
"core/dynamic_configs/USERVER_FILES_CONTENT_TYPE_MAP.yaml":"taxi/uservices/userver/core/dynamic_configs/USERVER_FILES_CONTENT_TYPE_MAP.yaml",
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
default: true
2+
description: |
3+
When `false`, this service stops using the absolute instant from `X-Request-Deadline`
4+
as the local request/task deadline, even if enabled in static config. Use for emergency
5+
shutdown without restart.
6+
7+
The header may still be parsed and stored for forwarding to downstream services
8+
(see `TaskInheritedData::original_deadline`). Duration-based propagation
9+
(`X-YaTaxi-Client-TimeoutMs`) remains unaffected.
10+
schema:
11+
type: boolean
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
default: 60000
2+
description: |
3+
Maximum allowed clock skew in milliseconds between the absolute deadline
4+
timestamp and the expected value based on duration header.
5+
6+
When the detected skew exceeds this threshold, the absolute timestamp is
7+
ignored and duration-based deadline propagation is used as a fallback.
8+
9+
Set to 0 to disable the check (always trust the absolute timestamp).
10+
11+
Recommended values:
12+
- 60000 (1 minute) for well-synchronized environments (default)
13+
- 300000 (5 minutes) for less reliable environments
14+
- 0 only if you fully trust your clock synchronization
15+
schema:
16+
type: integer
17+
minimum: 0
18+
x-taxi-cpp-type: std::chrono::milliseconds

core/functional_tests/basic_chaos/static_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ components_manager:
1616
path: /chaos/httpserver
1717
task_processor: main-task-processor
1818
method: GET,DELETE,POST
19+
deadline_propagation_prefer_timestamp: true
1920

2021
handler-chaos-httpserver-parse-body-args:
2122
path: /chaos/httpserver-parse-body-args

core/functional_tests/basic_chaos/tests-deadline/test_server.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import datetime
23
from typing import Any
34

45
import pytest
@@ -11,6 +12,14 @@
1112

1213
DP_TIMEOUT_MS = 'X-YaTaxi-Client-TimeoutMs'
1314
DP_DEADLINE_EXPIRED = 'X-YaTaxi-Deadline-Expired'
15+
DP_ABSOLUTE_DEADLINE = 'X-Request-Deadline'
16+
17+
18+
def _make_iso_deadline(offset_seconds: float) -> str:
19+
tp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
20+
seconds=offset_seconds,
21+
)
22+
return tp.strftime('%Y-%m-%dT%H:%M:%S.') + f'{tp.microsecond:06d}Z'
1423

1524

1625
@pytest.fixture(name='call')
@@ -92,6 +101,95 @@ async def test_deadline_propagation_disabled_dynamically(call):
92101
assert response.status == 200
93102

94103

104+
async def test_absolute_deadline_used(call):
105+
response = await call(
106+
headers={
107+
**HEADERS,
108+
DP_TIMEOUT_MS: '1',
109+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(5.0),
110+
},
111+
)
112+
assert response.status == 200
113+
114+
115+
async def test_absolute_deadline_expired(call):
116+
response = await call(
117+
headers={
118+
**HEADERS,
119+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(-120.0),
120+
},
121+
)
122+
_check_deadline_propagation_response(response)
123+
124+
125+
async def test_absolute_deadline_expired_with_sleep(call):
126+
response = await call(
127+
htype='sleep',
128+
headers={
129+
**HEADERS,
130+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(-120.0),
131+
},
132+
)
133+
_check_deadline_propagation_response(response)
134+
135+
136+
@pytest.mark.config(USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED=False)
137+
async def test_absolute_deadline_disabled_dynamically(call):
138+
response = await call(
139+
headers={
140+
**HEADERS,
141+
DP_TIMEOUT_MS: '5000',
142+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(-120.0),
143+
},
144+
)
145+
assert response.status == 200
146+
147+
148+
async def test_absolute_deadline_clock_skew_fallback(call):
149+
response = await call(
150+
headers={
151+
**HEADERS,
152+
DP_TIMEOUT_MS: '5000',
153+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(5.0 + 120.0),
154+
},
155+
)
156+
assert response.status == 200
157+
158+
159+
async def test_absolute_deadline_clock_skew_fallback_when_negative_skew(call):
160+
response = await call(
161+
headers={
162+
**HEADERS,
163+
DP_TIMEOUT_MS: '5000',
164+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(-120.0),
165+
},
166+
)
167+
assert response.status == 200
168+
169+
170+
@pytest.mark.config(USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS=0)
171+
async def test_absolute_deadline_threshold_zero_disables_skew_check(call):
172+
response = await call(
173+
headers={
174+
**HEADERS,
175+
DP_TIMEOUT_MS: '5000',
176+
DP_ABSOLUTE_DEADLINE: _make_iso_deadline(-120.0),
177+
},
178+
)
179+
_check_deadline_propagation_response(response)
180+
181+
182+
async def test_absolute_deadline_invalid_format(call):
183+
response = await call(
184+
headers={
185+
**HEADERS,
186+
DP_TIMEOUT_MS: '5000',
187+
DP_ABSOLUTE_DEADLINE: 'not-a-timestamp',
188+
},
189+
)
190+
assert response.status == 200
191+
192+
95193
async def test_cancellable(service_client, testpoint):
96194
@testpoint('testpoint_cancel')
97195
def cancel_testpoint(data):

core/library.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ configs:
1717
- USERVER_CACHES
1818
- USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE
1919
- USERVER_DEADLINE_PROPAGATION_ENABLED
20+
- USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED
21+
- USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS
2022
- USERVER_DUMPS
2123
- USERVER_FILES_CONTENT_TYPE_MAP
2224
- USERVER_HTTP_PROXY

core/src/server/middlewares/deadline_propagation.cpp

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
#include <userver/utils/from_string.hpp>
1515
#include <userver/utils/overloaded.hpp>
1616

17+
#include <userver/utils/datetime.hpp>
18+
19+
#include <dynamic_config/variables/USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED.hpp>
20+
#include <dynamic_config/variables/USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS.hpp>
1721
#include <dynamic_config/variables/USERVER_DEADLINE_PROPAGATION_ENABLED.hpp>
1822

1923
USERVER_NAMESPACE_BEGIN
@@ -60,6 +64,53 @@ std::optional<std::chrono::milliseconds> ParseTimeout(const http::HttpRequest& r
6064
return timeout;
6165
}
6266

67+
std::optional<request::TaskInheritedOriginalDeadline> ParseAbsoluteDeadline(const http::HttpRequest& request) {
68+
const auto& header_str = request.GetHeader(USERVER_NAMESPACE::http::headers::kXRequestDeadline);
69+
if (header_str.empty()) {
70+
return std::nullopt;
71+
}
72+
73+
LOG_DEBUG() << "Got X-Request-Deadline: " << header_str;
74+
try {
75+
const auto timestamp = utils::datetime::UtcStringtime(header_str, utils::datetime::kTaximeterFormat);
76+
return std::chrono::time_point_cast<std::chrono::microseconds>(timestamp);
77+
} catch (const std::exception& exception) {
78+
LOG_LIMITED_WARNING() << "Can't parse X-Request-Deadline from '" << header_str << "': " << exception.what();
79+
return std::nullopt;
80+
}
81+
}
82+
83+
/// Returns true if clock skew is within the threshold (absolute deadline is trustworthy).
84+
bool IsClockSkewAcceptable(
85+
const request::TaskInheritedOriginalDeadline& absolute_tp,
86+
std::chrono::milliseconds duration_timeout,
87+
std::chrono::milliseconds threshold,
88+
tracing::Span* span
89+
) {
90+
if (threshold == std::chrono::milliseconds{0}) {
91+
return true; // Check disabled
92+
}
93+
94+
const auto expected_timestamp =
95+
std::chrono::time_point_cast<std::chrono::microseconds>(utils::datetime::Now()) + duration_timeout;
96+
97+
const auto skew = std::chrono::abs(absolute_tp - expected_timestamp);
98+
const auto skew_ms = std::chrono::duration_cast<std::chrono::milliseconds>(skew);
99+
100+
if (span) {
101+
span->AddNonInheritableTag("dp_clock_skew_ms", skew_ms.count());
102+
}
103+
104+
if (skew_ms > threshold) {
105+
LOG_LIMITED_WARNING()
106+
<< "Clock skew detected: " << skew_ms << ", threshold: " << threshold
107+
<< ". Falling back to duration-based deadline propagation";
108+
return false;
109+
}
110+
111+
return true;
112+
}
113+
63114
void SetFormattedErrorResponse(http::HttpResponse& http_response, handlers::FormattedErrorData&& formatted_error_data) {
64115
http_response.SetData(std::move(formatted_error_data.external_body));
65116
if (formatted_error_data.content_type) {
@@ -153,28 +204,57 @@ void DeadlinePropagation::SetupInheritedDeadline(
153204
return;
154205
}
155206

207+
const auto original_deadline = ParseAbsoluteDeadline(request);
208+
if (original_deadline) {
209+
inherited_data.original_deadline = original_deadline;
210+
}
211+
212+
std::optional<engine::Deadline> chosen_deadline;
213+
std::int64_t span_deadline_received_ms = 0;
214+
auto* span_opt = tracing::Span::CurrentSpanUnchecked();
156215
const auto timeout = ParseTimeout(request);
157-
if (!timeout) {
158-
return;
216+
217+
if (original_deadline.has_value() && deadline_propagation_prefer_timestamp_ &&
218+
config_snapshot[::dynamic_config::USERVER_DEADLINE_PROPAGATION_ABSOLUTE_TIMESTAMP_ENABLED])
219+
{
220+
bool use_absolute = true;
221+
if (timeout.has_value()) {
222+
const auto&
223+
threshold = config_snapshot[::dynamic_config::USERVER_DEADLINE_PROPAGATION_CLOCK_SKEW_THRESHOLD_MS];
224+
use_absolute = IsClockSkewAcceptable(*original_deadline, *timeout, threshold, span_opt);
225+
}
226+
227+
if (use_absolute) {
228+
chosen_deadline = engine::Deadline::FromTimePoint(*original_deadline);
229+
230+
const auto now_sys_micro = std::chrono::time_point_cast<std::chrono::microseconds>(utils::datetime::Now());
231+
span_deadline_received_ms =
232+
std::chrono::duration_cast<std::chrono::milliseconds>(*original_deadline - now_sys_micro).count();
233+
}
234+
}
235+
236+
if (!chosen_deadline) {
237+
if (!timeout) {
238+
return;
239+
}
240+
chosen_deadline = engine::Deadline::FromTimePoint(request.GetStartTime() + *timeout);
241+
span_deadline_received_ms = timeout->count();
159242
}
160243

244+
inherited_data.deadline = *chosen_deadline;
161245
dp_scope.need_log_response = config_snapshot[::dynamic_config::USERVER_LOG_REQUEST];
162246

163-
auto* span_opt = tracing::Span::CurrentSpanUnchecked();
164247
if (span_opt) {
165-
span_opt->AddNonInheritableTag("deadline_received_ms", timeout->count());
248+
span_opt->AddNonInheritableTag("deadline_received_ms", span_deadline_received_ms);
166249
}
167250

168-
const auto deadline = engine::Deadline::FromTimePoint(request.GetStartTime() + *timeout);
169-
inherited_data.deadline = deadline;
170-
171-
if (deadline.IsSurelyReachedApprox()) {
251+
if (chosen_deadline->IsSurelyReachedApprox()) {
172252
HandleDeadlineExpired(request, dp_scope, "Immediate timeout (deadline propagation)");
173253
return;
174254
}
175255

176256
if (config_snapshot[::dynamic_config::USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE]) {
177-
engine::current_task::SetDeadline(deadline);
257+
engine::current_task::SetDeadline(*chosen_deadline);
178258
}
179259
}
180260

0 commit comments

Comments
 (0)