Skip to content

Commit 8ea4fa8

Browse files
committed
refactor grpc: add finish time to finish AsyncMethodInvocation
commit_hash:74557a62a1db760e0d2531cdef97953da030f4ec
1 parent 68342af commit 8ea4fa8

10 files changed

Lines changed: 113 additions & 155 deletions

File tree

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#pragma once
22

3+
#include <chrono>
4+
#include <optional>
5+
36
#include <grpcpp/client_context.h>
47

58
#include <userver/ugrpc/impl/async_method_invocation.hpp>
@@ -8,31 +11,19 @@ USERVER_NAMESPACE_BEGIN
811

912
namespace ugrpc::client::impl {
1013

11-
class CallState;
12-
1314
/// AsyncMethodInvocation for Finish method that stops stats and Span timers
1415
/// ASAP, without waiting for a Task to wake up
1516
class FinishAsyncMethodInvocation final : public ugrpc::impl::AsyncMethodInvocation {
1617
public:
17-
explicit FinishAsyncMethodInvocation(CallState& state);
18-
19-
~FinishAsyncMethodInvocation() override;
20-
2118
void Notify(bool ok) noexcept override;
2219

20+
/// When notify is called, we store timestamp and later use it in statistics
21+
[[nodiscard]] std::chrono::steady_clock::time_point GetFinishTime() const;
22+
2323
private:
24-
CallState& state_;
24+
std::optional<std::chrono::steady_clock::time_point> finish_time_;
2525
};
2626

27-
ugrpc::impl::AsyncMethodInvocation::WaitStatus
28-
Wait(ugrpc::impl::AsyncMethodInvocation& invocation, grpc::ClientContext& context) noexcept;
29-
30-
ugrpc::impl::AsyncMethodInvocation::WaitStatus WaitUntil(
31-
ugrpc::impl::AsyncMethodInvocation& invocation,
32-
grpc::ClientContext& context,
33-
engine::Deadline deadline
34-
) noexcept;
35-
3627
} // namespace ugrpc::client::impl
3728

3829
USERVER_NAMESPACE_END

grpc/include/userver/ugrpc/client/impl/async_methods.hpp

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,46 +46,65 @@ google::protobuf::Message* ToBaseMessage(Message& message) {
4646
}
4747
}
4848

49+
ugrpc::impl::AsyncMethodInvocation::WaitStatus WaitAndTryCancelIfNeeded(
50+
ugrpc::impl::AsyncMethodInvocation& invocation,
51+
engine::Deadline deadline,
52+
grpc::ClientContext& context
53+
) noexcept;
54+
55+
ugrpc::impl::AsyncMethodInvocation::WaitStatus
56+
WaitAndTryCancelIfNeeded(ugrpc::impl::AsyncMethodInvocation& invocation, grpc::ClientContext& context) noexcept;
57+
4958
void CheckOk(CallState& state, AsyncMethodInvocation::WaitStatus status, std::string_view stage);
5059

5160
template <typename GrpcStream>
5261
void StartCall(GrpcStream& stream, CallState& state) {
5362
AsyncMethodInvocation start_call;
5463
stream.StartCall(start_call.GetTag());
55-
CheckOk(state, Wait(start_call, state.GetContext()), "StartCall");
64+
CheckOk(state, WaitAndTryCancelIfNeeded(start_call, state.GetContext()), "StartCall");
5665
}
5766

5867
void PrepareFinish(CallState& state);
5968

6069
void ProcessFinish(CallState& state, google::protobuf::Message* final_response);
6170

62-
void CheckFinishStatus(CallState& state);
71+
void ProcessFinishCancelled(CallState& state);
6372

64-
void ProcessFinishResult(
65-
CallState& state,
66-
AsyncMethodInvocation::WaitStatus wait_status,
67-
google::protobuf::Message* final_response,
68-
bool throw_on_error
69-
);
73+
void CheckFinishStatus(CallState& state);
7074

7175
template <typename GrpcStream>
7276
void Finish(GrpcStream& stream, CallState& state, google::protobuf::Message* final_response, bool throw_on_error) {
7377
PrepareFinish(state);
7478

75-
FinishAsyncMethodInvocation finish(state);
79+
FinishAsyncMethodInvocation finish;
7680
auto& status = state.GetStatus();
7781
stream.Finish(&status, finish.GetTag());
7882

79-
const auto wait_status = Wait(finish, state.GetContext());
80-
ProcessFinishResult(state, wait_status, final_response, throw_on_error);
83+
const auto wait_status = WaitAndTryCancelIfNeeded(finish, state.GetContext());
84+
if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
85+
ProcessFinishCancelled(state);
86+
// Finish AsyncMethodInvocation will be awaited in its destructor.
87+
if (throw_on_error) {
88+
throw RpcCancelledError(state.GetCallName(), "Finish");
89+
}
90+
}
91+
92+
UINVARIANT(
93+
impl::AsyncMethodInvocation::WaitStatus::kOk == wait_status, "Client-side Finish: ok should always be true"
94+
);
95+
state.GetStatsScope().SetFinishTime(finish.GetFinishTime());
96+
ProcessFinish(state, final_response);
97+
if (throw_on_error) {
98+
CheckFinishStatus(state);
99+
}
81100
}
82101

83102
template <typename GrpcStream, typename Response>
84103
[[nodiscard]] bool Read(GrpcStream& stream, Response& response, CallState& state) {
85104
UINVARIANT(state.IsReadAvailable(), "'impl::Read' called on a finished call");
86105
AsyncMethodInvocation read;
87106
stream.Read(&response, read.GetTag());
88-
const auto wait_status = Wait(read, state.GetContext());
107+
const auto wait_status = WaitAndTryCancelIfNeeded(read, state.GetContext());
89108
if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
90109
state.GetStatsScope().OnCancelled();
91110
}
@@ -105,7 +124,7 @@ bool Write(GrpcStream& stream, const Request& request, grpc::WriteOptions option
105124
UINVARIANT(state.IsWriteAvailable(), "'impl::Write' called on a stream that is closed for writes");
106125
AsyncMethodInvocation write;
107126
stream.Write(request, options, write.GetTag());
108-
const auto result = Wait(write, state.GetContext());
127+
const auto result = WaitAndTryCancelIfNeeded(write, state.GetContext());
109128
if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
110129
state.GetStatsScope().OnCancelled();
111130
}
@@ -120,7 +139,7 @@ void WriteAndCheck(GrpcStream& stream, const Request& request, grpc::WriteOption
120139
UINVARIANT(state.IsWriteAndCheckAvailable(), "'impl::WriteAndCheck' called on a finished or closed stream");
121140
AsyncMethodInvocation write;
122141
stream.Write(request, options, write.GetTag());
123-
CheckOk(state, Wait(write, state.GetContext()), "WriteAndCheck");
142+
CheckOk(state, WaitAndTryCancelIfNeeded(write, state.GetContext()), "WriteAndCheck");
124143
}
125144

126145
template <typename GrpcStream>
@@ -129,7 +148,7 @@ bool WritesDone(GrpcStream& stream, CallState& state) {
129148
state.SetWritesFinished();
130149
AsyncMethodInvocation writes_done;
131150
stream.WritesDone(writes_done.GetTag());
132-
const auto wait_status = Wait(writes_done, state.GetContext());
151+
const auto wait_status = WaitAndTryCancelIfNeeded(writes_done, state.GetContext());
133152
if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
134153
state.GetStatsScope().OnCancelled();
135154
}

grpc/include/userver/ugrpc/client/rpc.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,8 @@ template <typename RPC>
442442
StreamReadFuture<RPC>::~StreamReadFuture() {
443443
if (state_) {
444444
const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
445-
const auto wait_status = impl::Wait(state_->GetAsyncMethodInvocation(), state_->GetContext());
445+
const auto wait_status =
446+
impl::WaitAndTryCancelIfNeeded(state_->GetAsyncMethodInvocation(), state_->GetContext());
446447
if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
447448
if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
448449
state_->GetStatsScope().OnCancelled();
@@ -461,7 +462,7 @@ bool StreamReadFuture<RPC>::Get() {
461462
UINVARIANT(state_, "'Get' must be called only once");
462463
const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
463464
auto* const state = std::exchange(state_, nullptr);
464-
const auto result = impl::Wait(state->GetAsyncMethodInvocation(), state->GetContext());
465+
const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetContext());
465466
if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
466467
state->GetStatsScope().OnCancelled();
467468
state->GetStatsScope().Flush();

grpc/include/userver/ugrpc/impl/statistics_scope.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class RpcStatisticsScope final {
3030

3131
void OnNetworkError() noexcept;
3232

33+
void SetFinishTime(std::chrono::steady_clock::time_point finish_time) noexcept;
34+
3335
void Flush() noexcept;
3436

3537
// Not thread-safe with respect to Flush.
@@ -60,9 +62,9 @@ class RpcStatisticsScope final {
6062

6163
utils::NotNull<MethodStatistics*> statistics_;
6264
std::optional<std::chrono::steady_clock::time_point> start_time_;
65+
std::optional<std::chrono::steady_clock::time_point> finish_time_;
6366
FinishKind finish_kind_{FinishKind::kAutomatic};
6467
grpc::StatusCode finish_code_{};
65-
std::atomic<bool> is_cancelled_{false};
6668
bool is_deadline_propagated_{false};
6769
};
6870

grpc/src/ugrpc/client/impl/async_method_invocation.cpp

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,17 @@
11
#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
22

3-
#include <userver/ugrpc/client/impl/call_state.hpp>
4-
53
USERVER_NAMESPACE_BEGIN
64

75
namespace ugrpc::client::impl {
86

9-
FinishAsyncMethodInvocation::FinishAsyncMethodInvocation(CallState& state) : state_(state) {}
10-
11-
FinishAsyncMethodInvocation::~FinishAsyncMethodInvocation() { WaitWhileBusy(); }
12-
137
void FinishAsyncMethodInvocation::Notify(bool ok) noexcept {
14-
if (ok) {
15-
try {
16-
auto& status = state_.GetStatus();
17-
state_.GetStatsScope().OnExplicitFinish(status.error_code());
18-
19-
if (status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED && state_.IsDeadlinePropagated()) {
20-
state_.GetStatsScope().OnCancelledByDeadlinePropagation();
21-
}
22-
23-
state_.GetStatsScope().Flush();
24-
} catch (const std::exception& ex) {
25-
LOG_LIMITED_ERROR() << "Error in FinishAsyncMethodInvocation::Notify: " << ex;
26-
} catch (...) {
27-
// We have to catch any exception to notify a task in any case.
28-
LOG_LIMITED_ERROR() << "Error in FinishAsyncMethodInvocation::Notify: <non std::exception-based>";
29-
}
30-
}
8+
finish_time_ = std::chrono::steady_clock::now();
319
AsyncMethodInvocation::Notify(ok);
3210
}
3311

34-
ugrpc::impl::AsyncMethodInvocation::WaitStatus
35-
Wait(ugrpc::impl::AsyncMethodInvocation& invocation, grpc::ClientContext& context) noexcept {
36-
return impl::WaitUntil(invocation, context, engine::Deadline{});
37-
}
38-
39-
ugrpc::impl::AsyncMethodInvocation::WaitStatus WaitUntil(
40-
ugrpc::impl::AsyncMethodInvocation& invocation,
41-
grpc::ClientContext& context,
42-
engine::Deadline deadline
43-
) noexcept {
44-
const auto status = invocation.WaitUntil(deadline);
45-
if (status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
46-
context.TryCancel();
47-
}
48-
49-
return status;
12+
std::chrono::steady_clock::time_point FinishAsyncMethodInvocation::GetFinishTime() const {
13+
UINVARIANT(finish_time_.has_value(), "GetFinishTime should be called after invocation was notified");
14+
return *finish_time_;
5015
}
5116

5217
} // namespace ugrpc::client::impl

grpc/src/ugrpc/client/impl/async_methods.cpp

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ namespace ugrpc::client::impl {
1616

1717
namespace {
1818

19+
void ProcessCallStatistics(CallState& state, const grpc::Status& status) {
20+
auto& stats = state.GetStatsScope();
21+
stats.OnExplicitFinish(status.error_code());
22+
if (status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED && state.IsDeadlinePropagated()) {
23+
stats.OnCancelledByDeadlinePropagation();
24+
}
25+
stats.Flush();
26+
}
27+
1928
void SetStatusAndResetSpan(CallState& state, const grpc::Status& status) {
2029
SetStatusForSpan(state.GetSpan(), status);
2130
state.ResetSpan();
@@ -28,6 +37,23 @@ void SetErrorAndResetSpan(CallState& state, std::string_view error_message) {
2837

2938
} // namespace
3039

40+
ugrpc::impl::AsyncMethodInvocation::WaitStatus WaitAndTryCancelIfNeeded(
41+
ugrpc::impl::AsyncMethodInvocation& invocation,
42+
engine::Deadline deadline,
43+
grpc::ClientContext& context
44+
) noexcept {
45+
const auto wait_status = invocation.WaitUntil(deadline);
46+
if (ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled == wait_status) {
47+
context.TryCancel();
48+
}
49+
return wait_status;
50+
}
51+
52+
ugrpc::impl::AsyncMethodInvocation::WaitStatus
53+
WaitAndTryCancelIfNeeded(ugrpc::impl::AsyncMethodInvocation& invocation, grpc::ClientContext& context) noexcept {
54+
return WaitAndTryCancelIfNeeded(invocation, engine::Deadline{}, context);
55+
}
56+
3157
void CheckOk(CallState& state, AsyncMethodInvocation::WaitStatus status, std::string_view stage) {
3258
if (status == impl::AsyncMethodInvocation::WaitStatus::kError) {
3359
state.SetFinished();
@@ -38,7 +64,7 @@ void CheckOk(CallState& state, AsyncMethodInvocation::WaitStatus status, std::st
3864
} else if (status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
3965
state.SetFinished();
4066
state.GetStatsScope().OnCancelled();
41-
// Cannot do stats_scope.Flush(), as it can be called in Notify concurrently.
67+
state.GetStatsScope().Flush();
4268
SetErrorAndResetSpan(state, fmt::format("Task cancellation at '{}'", stage));
4369
throw RpcCancelledError(state.GetCallName(), stage);
4470
}
@@ -52,8 +78,7 @@ void PrepareFinish(CallState& state) {
5278
void ProcessFinish(CallState& state, google::protobuf::Message* final_response) {
5379
const auto& status = state.GetStatus();
5480

55-
state.GetStatsScope().OnExplicitFinish(status.error_code());
56-
state.GetStatsScope().Flush();
81+
ProcessCallStatistics(state, status);
5782

5883
if (final_response && status.ok()) {
5984
MiddlewarePipeline::PostRecvMessage(state, *final_response);
@@ -63,43 +88,19 @@ void ProcessFinish(CallState& state, google::protobuf::Message* final_response)
6388
SetStatusAndResetSpan(state, status);
6489
}
6590

91+
void ProcessFinishCancelled(CallState& state) {
92+
state.GetStatsScope().OnCancelled();
93+
state.GetStatsScope().Flush();
94+
SetErrorAndResetSpan(state, "Task cancellation at 'Finish'");
95+
}
96+
6697
void CheckFinishStatus(CallState& state) {
6798
auto& status = state.GetStatus();
6899
if (!status.ok()) {
69100
ThrowErrorWithStatus(state.GetCallName(), std::move(status));
70101
}
71102
}
72103

73-
void ProcessFinishResult(
74-
CallState& state,
75-
AsyncMethodInvocation::WaitStatus wait_status,
76-
google::protobuf::Message* final_response,
77-
bool throw_on_error
78-
) {
79-
if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
80-
state.GetStatsScope().OnCancelled();
81-
// Cannot do stats_scope.Flush(), as it can be called in Notify concurrently.
82-
state.GetContext().TryCancel();
83-
SetErrorAndResetSpan(state, "Task cancellation at 'Finish'");
84-
// Finish AsyncMethodInvocation will be awaited in its destructor.
85-
if (throw_on_error) {
86-
throw RpcCancelledError(state.GetCallName(), "Finish");
87-
}
88-
return;
89-
}
90-
91-
UASSERT_MSG(
92-
wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk,
93-
"ok=false in async Finish method invocation is prohibited "
94-
"by gRPC docs, see grpc::CompletionQueue::Next"
95-
);
96-
97-
ProcessFinish(state, final_response);
98-
if (throw_on_error) {
99-
CheckFinishStatus(state);
100-
}
101-
}
102-
103104
} // namespace ugrpc::client::impl
104105

105106
USERVER_NAMESPACE_END

grpc/src/ugrpc/client/impl/call_state.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ void CallState::EmplaceFinishAsyncMethodInvocation() {
149149
std::holds_alternative<std::monostate>(invocation_),
150150
"Another method is already running for this RPC concurrently"
151151
);
152-
invocation_.emplace<FinishAsyncMethodInvocation>(*this);
152+
invocation_.emplace<FinishAsyncMethodInvocation>();
153153
}
154154

155155
AsyncMethodInvocation& CallState::GetAsyncMethodInvocation() noexcept {

grpc/src/ugrpc/client/rpc.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ engine::FutureStatus UnaryFinishFutureImpl::WaitUntil(engine::Deadline deadline)
5757
if (state_->IsFinishProcessed()) return engine::FutureStatus::kReady;
5858

5959
auto& finish = state_->GetFinishAsyncMethodInvocation();
60-
const auto wait_status = impl::WaitUntil(finish, state_->GetContext(), deadline);
61-
60+
const auto wait_status = impl::WaitAndTryCancelIfNeeded(finish, deadline, state_->GetContext());
6261
switch (wait_status) {
6362
case impl::AsyncMethodInvocation::WaitStatus::kOk:
6463
case impl::AsyncMethodInvocation::WaitStatus::kError:
@@ -68,6 +67,7 @@ engine::FutureStatus UnaryFinishFutureImpl::WaitUntil(engine::Deadline deadline)
6867
impl::AsyncMethodInvocation::WaitStatus::kOk == wait_status,
6968
"Client-side Finish: ok should always be true"
7069
);
70+
state_->GetStatsScope().SetFinishTime(finish.GetFinishTime());
7171
ProcessFinish(*state_, final_response_);
7272
} catch (...) {
7373
exception_ = std::current_exception();

0 commit comments

Comments
 (0)