Skip to content

Commit 6c1da8c

Browse files
committed
feat core: add congestion-control-sensor testpoints
commit_hash:fc35a41d16b98e629ca3a4740eae5c355cd2fde5
1 parent 81c182b commit 6c1da8c

3 files changed

Lines changed: 113 additions & 10 deletions

File tree

core/src/congestion_control/watchdog.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,36 @@ void Watchdog::Check() {
4444
auto cis = cis_.Lock();
4545
for (const auto& ci : *cis) {
4646
auto data = ci.sensor.FetchCurrent();
47+
48+
TESTPOINT_CALLBACK_NONCORO(
49+
"congestion-control-sensor",
50+
formats::json::Value{},
51+
tp_,
52+
[&data](const formats::json::Value& doc) {
53+
data.current_load = doc["current-load"].As<std::uint64_t>(data.current_load);
54+
data.overload_events_count = doc["overload-events-count"].As<std::uint64_t>(data.overload_events_count);
55+
data.no_overload_events_count =
56+
doc["no-overload-events-count"].As<std::uint64_t>(data.no_overload_events_count);
57+
LOG_INFO() << "Forcing CC Sensor data from testpoint: " << doc;
58+
}
59+
);
60+
4761
ci.controller.Feed(data);
62+
4863
auto limit = ci.controller.GetLimit();
4964

5065
TESTPOINT_CALLBACK_NONCORO(
5166
"congestion-control",
5267
formats::json::Value{},
5368
tp_,
5469
[&limit](const formats::json::Value& doc) {
55-
limit.load_limit = doc["force-rps-limit"].As<std::optional<size_t>>();
56-
LOG_ERROR() << "Forcing RPS limit from testpoint: " << limit.load_limit;
70+
if (doc.HasMember("force-rps-limit")) {
71+
limit.load_limit = doc["force-rps-limit"].As<std::optional<std::size_t>>();
72+
LOG_ERROR() << "Forcing RPS limit from testpoint: " << limit.load_limit;
73+
}
5774
}
5875
);
76+
5977
ci.limiter.SetLimit(limit);
6078

6179
TESTPOINT_NONCORO("congestion-control-apply", formats::json::Value{}, tp_);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,38 @@
1+
from typing import Any
2+
13
import pytest
24

35
import samples.greeter_pb2_grpc as greeter_pb2_grpc
46

57
pytest_plugins = ['pytest_userver.plugins.grpc']
68

79

10+
@pytest.fixture(scope='session')
11+
def service_env():
12+
return {'CPU_LIMIT': '1c'}
13+
14+
15+
@pytest.fixture(scope='session')
16+
def congestion_control_fake_mode() -> bool:
17+
return False
18+
19+
20+
@pytest.fixture(scope='session')
21+
def dynamic_config_fallback_patch() -> dict[str, Any]:
22+
return {
23+
'USERVER_RPS_CCONTROL_ENABLED': True,
24+
'USERVER_RPS_CCONTROL': {
25+
'min-limit': 1,
26+
'up-rate-percent': 100,
27+
'down-rate-percent': 100,
28+
'overload-on-seconds': 1,
29+
'overload-off-seconds': 1,
30+
'no-limit-seconds': 1,
31+
'start-limit-factor': 0.01,
32+
},
33+
}
34+
35+
836
@pytest.fixture
937
def grpc_client(grpc_channel):
1038
return greeter_pb2_grpc.GreeterServiceStub(grpc_channel)

grpc/functional_tests/basic_chaos/tests-nonchaos/test_basic.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,22 @@
44
import samples.greeter_pb2 as greeter_pb2
55

66

7-
async def test_grpc_cc_enabled(grpc_client, service_client, testpoint):
7+
async def test_grpc_congestion_control_limit(grpc_client, service_client, testpoint):
8+
@testpoint('congestion-control-sensor')
9+
def tp_congestion_control_sensor(data):
10+
return {}
11+
812
@testpoint('congestion-control')
9-
def tp_cc_enable(data):
13+
def tp_congestion_control(data):
1014
return {'force-rps-limit': 0}
1115

1216
@testpoint('congestion-control-apply')
13-
def tp_cc_apply(data):
17+
def tp_congestion_control_apply(data):
1418
return {}
1519

1620
# wait until server obtains the new limit, up to 1 second
1721
await service_client.enable_testpoints()
18-
await tp_cc_enable.wait_call()
19-
await tp_cc_apply.wait_call()
22+
await tp_congestion_control.wait_call()
2023

2124
# Random non-ping handler is throttled
2225
with pytest.raises(grpc.RpcError) as error:
@@ -27,11 +30,65 @@ def tp_cc_apply(data):
2730

2831
# A hack to disable CC for other tests
2932
@testpoint('congestion-control')
30-
def tp_cc_disable(data):
33+
def tp_congestion_control_disable(data):
34+
return {'force-rps-limit': None}
35+
36+
await tp_congestion_control_disable.wait_call()
37+
38+
39+
async def test_grpc_congestion_control_activate(grpc_client, service_client, testpoint):
40+
@testpoint('congestion-control-sensor')
41+
def tp_congestion_control_sensor(data):
42+
return {
43+
'current-load': 100,
44+
'overload-events-count': 4,
45+
'no-overload-events-count': 96,
46+
}
47+
48+
@testpoint('congestion-control')
49+
def tp_congestion_control(data):
50+
return {}
51+
52+
@testpoint('congestion-control-apply')
53+
def tp_congestion_control_apply(data):
3154
return {}
3255

33-
await tp_cc_disable.wait_call()
34-
await tp_cc_apply.wait_call()
56+
# is_overloaded -> true
57+
await service_client.enable_testpoints()
58+
await tp_congestion_control_sensor.wait_call()
59+
await tp_congestion_control.wait_call()
60+
61+
# current_limit -> 1
62+
await tp_congestion_control_sensor.wait_call()
63+
await tp_congestion_control.wait_call()
64+
65+
with pytest.raises(grpc.RpcError) as error:
66+
for i in range(0, 2):
67+
request = greeter_pb2.GreetingRequest(name=f'Name0{i}/1')
68+
await grpc_client.SayHello(request, wait_for_ready=True)
69+
assert error.value.details() == 'Congestion control: rate limit exceeded'
70+
assert error.value.code() == grpc.StatusCode.RESOURCE_EXHAUSTED
71+
72+
@testpoint('congestion-control-sensor')
73+
def tp_congestion_control_sensor_disable(data):
74+
return {
75+
'current-load': 100,
76+
'overload-events-count': 0,
77+
'no-overload-events-count': 100,
78+
}
79+
80+
# current_limit -> 1
81+
await tp_congestion_control_sensor_disable.wait_call()
82+
await tp_congestion_control.wait_call()
83+
84+
# current_limit -> null
85+
await tp_congestion_control_sensor_disable.wait_call()
86+
await tp_congestion_control.wait_call()
87+
88+
for i in range(1, 10):
89+
request = greeter_pb2.GreetingRequest(name=f'Name{i}/2')
90+
response = await grpc_client.SayHello(request, wait_for_ready=True)
91+
assert response.greeting == f'Hello, Name{i}/2!'
3592

3693

3794
async def test_grpc_cancellation(grpc_client, service_client, testpoint):

0 commit comments

Comments
 (0)