Skip to content

Commit 5a9e707

Browse files
author
lexeyo
committed
feat grpc: add graceful shutdown headers to trailing metadata if graceful shutdown started after initial metadata sending
commit_hash:9c719dac7c45a0c977b598a5c05049d66f734aca
1 parent da86429 commit 5a9e707

3 files changed

Lines changed: 121 additions & 13 deletions

File tree

grpc/functional_tests/middleware_server/tests/test_graceful_shutdown_headers.py

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from signal import SIGTERM
23

34
import pytest
@@ -20,8 +21,7 @@ async def test_graceful_shutdown_headers(service_daemon_instance, grpc_client, s
2021
]
2122

2223
for headers_enabled, headers in params:
23-
dynamic_config.set_values({'GRACEFUL_SHUTDOWN_HEADERS': {'enabled': headers_enabled, 'headers': headers}})
24-
await service_client.update_server_state()
24+
await update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers)
2525

2626
request = greeter_protos.GreetingRequest(name='Python')
2727
call = grpc_client.SayHello(request)
@@ -33,8 +33,7 @@ async def test_graceful_shutdown_headers(service_daemon_instance, grpc_client, s
3333
service_daemon_instance.process.send_signal(SIGTERM)
3434

3535
for headers_enabled, headers in params:
36-
dynamic_config.set_values({'GRACEFUL_SHUTDOWN_HEADERS': {'enabled': headers_enabled, 'headers': headers}})
37-
await service_client.update_server_state()
36+
await update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers)
3837

3938
request = greeter_protos.GreetingRequest(name='Python')
4039
call = grpc_client.SayHello(request)
@@ -52,6 +51,83 @@ async def test_graceful_shutdown_headers(service_daemon_instance, grpc_client, s
5251
service_daemon_instance.process.wait()
5352

5453

54+
@pytest.mark.uservice_oneshot
55+
async def test_graceful_shutdown_headers_streams(service_daemon_instance, grpc_client, service_client, dynamic_config):
56+
headers = {'x-envoy-immediate-health-check-fail': ['true']}
57+
await update_graceful_shutdown_headers(service_client, dynamic_config, True, headers)
58+
59+
start = f'Python{MD_ONE_REQ}{MD_TWO_REQ}'
60+
end = f'{MD_TWO_RES}{MD_ONE_RES}'
61+
62+
service_daemon_instance.process.send_signal(SIGTERM)
63+
64+
call = grpc_client.SayHelloStreams(
65+
_prepare_requests(['Python', '!', '!', '!'], 0.1),
66+
wait_for_ready=True,
67+
)
68+
async for response in call:
69+
assert response.greeting == f'Hello, {start}{end}'
70+
start += f'!{MD_ONE_REQ}{MD_TWO_REQ}'
71+
72+
check_present(await call.initial_metadata(), headers)
73+
check_not_present(await call.trailing_metadata(), headers)
74+
75+
# After a couple more seconds, the service will start shutting down.
76+
service_daemon_instance.process.wait()
77+
78+
79+
@pytest.mark.uservice_oneshot
80+
async def test_late_graceful_shutdown_headers_streams(
81+
service_daemon_instance, grpc_client, service_client, dynamic_config
82+
):
83+
headers = {'x-envoy-immediate-health-check-fail': ['true']}
84+
await update_graceful_shutdown_headers(service_client, dynamic_config, True, headers)
85+
86+
start = f'Python{MD_ONE_REQ}{MD_TWO_REQ}'
87+
end = f'{MD_TWO_RES}{MD_ONE_RES}'
88+
89+
call = grpc_client.SayHelloStreams(
90+
_prepare_late_requests(service_daemon_instance, ['Python', '!', '!'], 0.2),
91+
wait_for_ready=True,
92+
)
93+
async for response in call:
94+
assert response.greeting == f'Hello, {start}{end}'
95+
start += f'!{MD_ONE_REQ}{MD_TWO_REQ}'
96+
97+
check_not_present(await call.initial_metadata(), headers)
98+
check_present(await call.trailing_metadata(), headers)
99+
100+
# After a couple more seconds, the service will start shutting down.
101+
service_daemon_instance.process.wait()
102+
103+
104+
async def _prepare_requests(names, sleep=1):
105+
reqs = []
106+
for name in names:
107+
reqs.append(greeter_protos.GreetingRequest(name=name))
108+
for req in reqs:
109+
await asyncio.sleep(sleep)
110+
yield req
111+
112+
113+
async def _prepare_late_requests(service_daemon_instance, names, sleep=1):
114+
reqs = []
115+
for name in names:
116+
reqs.append(greeter_protos.GreetingRequest(name=name))
117+
i = 0
118+
for req in reqs:
119+
if i == 1:
120+
service_daemon_instance.process.send_signal(SIGTERM)
121+
i += 1
122+
await asyncio.sleep(sleep)
123+
yield req
124+
125+
126+
async def update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers):
127+
dynamic_config.set_values({'GRACEFUL_SHUTDOWN_HEADERS': {'enabled': headers_enabled, 'headers': headers}})
128+
await service_client.update_server_state()
129+
130+
55131
def check_present(metadata, headers: dict[str, list[str]]):
56132
metadata_dict = to_dict(metadata)
57133
for k, v in headers.items():

grpc/src/ugrpc/server/middlewares/graceful_shutdown_headers/middleware.cpp

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,35 @@ USERVER_NAMESPACE_BEGIN
77

88
namespace ugrpc::server::middlewares::graceful_shutdown_headers {
99

10+
namespace {
11+
12+
struct Identity {};
13+
14+
inline const utils::AnyStorageDataTag<ugrpc::server::StorageContext, Identity> kGracefulShutdownHeadersSet;
15+
16+
using AddMetadataMethod = decltype(&grpc::ServerContext::AddInitialMetadata);
17+
18+
bool SetHeadersIfNeeded(
19+
const dynamic_config::Snapshot& config,
20+
MiddlewareCallContext& context,
21+
AddMetadataMethod add_metadata_method
22+
) {
23+
const auto& graceful_shutdown_headers = config[::dynamic_config::GRACEFUL_SHUTDOWN_HEADERS];
24+
if (!graceful_shutdown_headers.enabled) {
25+
return false;
26+
}
27+
auto& server_context = context.GetServerContext();
28+
for (const auto& [name, values] : graceful_shutdown_headers.headers.extra) {
29+
auto header_name = ugrpc::impl::ToGrpcString(name);
30+
for (const auto& value : values) {
31+
(server_context.*add_metadata_method)(header_name, ugrpc::impl::ToGrpcString(value));
32+
}
33+
}
34+
return true;
35+
}
36+
37+
} // namespace
38+
1039
Middleware::Middleware(const components::State& state, dynamic_config::Source source)
1140
: state_(state),
1241
source_(std::move(source))
@@ -16,18 +45,20 @@ void Middleware::OnCallStart(MiddlewareCallContext& context) const {
1645
if (state_.GetServiceLifetimeStage() != components::ServiceLifetimeStage::kGracefulShutdown) {
1746
return;
1847
}
19-
const auto& config = context.GetInitialDynamicConfig();
20-
const auto& graceful_shutdown_headers = config[::dynamic_config::GRACEFUL_SHUTDOWN_HEADERS];
21-
if (!graceful_shutdown_headers.enabled) {
48+
if (SetHeadersIfNeeded(context.GetInitialDynamicConfig(), context, &grpc::ServerContext::AddInitialMetadata)) {
49+
context.GetStorageContext().Set(kGracefulShutdownHeadersSet, {});
50+
}
51+
}
52+
53+
void Middleware::PreSendStatus(MiddlewareCallContext& context, grpc::Status&) const {
54+
if (state_.GetServiceLifetimeStage() != components::ServiceLifetimeStage::kGracefulShutdown) {
2255
return;
2356
}
24-
auto& server_context = context.GetServerContext();
25-
for (const auto& [name, values] : graceful_shutdown_headers.headers.extra) {
26-
auto header_name = ugrpc::impl::ToGrpcString(name);
27-
for (const auto& value : values) {
28-
server_context.AddInitialMetadata(header_name, ugrpc::impl::ToGrpcString(value));
29-
}
57+
if (context.GetStorageContext().GetOptional(kGracefulShutdownHeadersSet) != nullptr) {
58+
return;
3059
}
60+
61+
SetHeadersIfNeeded(source_.GetSnapshot(), context, &grpc::ServerContext::AddTrailingMetadata);
3162
}
3263

3364
} // namespace ugrpc::server::middlewares::graceful_shutdown_headers

grpc/src/ugrpc/server/middlewares/graceful_shutdown_headers/middleware.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class Middleware final : public MiddlewareBase {
1313
Middleware(const components::State& state, dynamic_config::Source source);
1414

1515
void OnCallStart(MiddlewareCallContext& context) const override;
16+
void PreSendStatus(MiddlewareCallContext& context, grpc::Status&) const override;
1617

1718
private:
1819
const components::State state_;

0 commit comments

Comments
 (0)