Skip to content

Commit 8de7a90

Browse files
authored
[refactor](fragment mgr) move report logic to pipeline fragment context to remove callback parameter from ctor (#62500) (#62549)
1 parent bb97072 commit 8de7a90

6 files changed

Lines changed: 276 additions & 322 deletions

File tree

be/src/exec/pipeline/pipeline_fragment_context.cpp

Lines changed: 266 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "exec/pipeline/pipeline_fragment_context.h"
1919

2020
#include <gen_cpp/DataSinks_types.h>
21+
#include <gen_cpp/FrontendService.h>
22+
#include <gen_cpp/FrontendService_types.h>
2123
#include <gen_cpp/PaloInternalService_types.h>
2224
#include <gen_cpp/PlanNodes_types.h>
2325
#include <pthread.h>
@@ -26,6 +28,9 @@
2628
#include <cstdlib>
2729
// IWYU pragma: no_include <bits/chrono.h>
2830
#include <fmt/format.h>
31+
#include <thrift/Thrift.h>
32+
#include <thrift/protocol/TDebugProtocol.h>
33+
#include <thrift/transport/TTransportException.h>
2934

3035
#include <chrono> // IWYU pragma: keep
3136
#include <map>
@@ -124,24 +129,24 @@
124129
#include "runtime/runtime_state.h"
125130
#include "runtime/thread_context.h"
126131
#include "service/backend_options.h"
132+
#include "util/client_cache.h"
127133
#include "util/countdown_latch.h"
128134
#include "util/debug_util.h"
135+
#include "util/network_util.h"
129136
#include "util/uid_util.h"
130137

131138
namespace doris {
132139
#include "common/compile_check_begin.h"
133140
PipelineFragmentContext::PipelineFragmentContext(
134141
TUniqueId query_id, const TPipelineFragmentParams& request,
135142
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136-
const std::function<void(RuntimeState*, Status*)>& call_back,
137-
report_status_callback report_status_cb)
143+
const std::function<void(RuntimeState*, Status*)>& call_back)
138144
: _query_id(std::move(query_id)),
139145
_fragment_id(request.fragment_id),
140146
_exec_env(exec_env),
141147
_query_ctx(std::move(query_ctx)),
142148
_call_back(call_back),
143149
_is_report_on_cancel(true),
144-
_report_status_cb(std::move(report_status_cb)),
145150
_params(request),
146151
_parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147152
_need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
@@ -1966,6 +1971,256 @@ std::string PipelineFragmentContext::get_first_error_msg() {
19661971
return "";
19671972
}
19681973

1974+
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
1975+
std::stringstream url;
1976+
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
1977+
<< "/api/_download_load?"
1978+
<< "token=" << _exec_env->token() << "&file=" << file_name;
1979+
return url.str();
1980+
}
1981+
1982+
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
1983+
DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
1984+
int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
1985+
LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
1986+
std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
1987+
LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
1988+
});
1989+
1990+
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
1991+
if (req.coord_addr.hostname == "external") {
1992+
// External query (flink/spark read tablets) not need to report to FE.
1993+
return;
1994+
}
1995+
int callback_retries = 10;
1996+
const int sleep_ms = 1000;
1997+
Status exec_status = req.status;
1998+
Status coord_status;
1999+
std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2000+
do {
2001+
coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2002+
req.coord_addr, &coord_status);
2003+
if (!coord_status.ok()) {
2004+
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2005+
}
2006+
} while (!coord_status.ok() && callback_retries-- > 0);
2007+
2008+
if (!coord_status.ok()) {
2009+
UniqueId uid(req.query_id.hi, req.query_id.lo);
2010+
static_cast<void>(req.cancel_fn(Status::InternalError(
2011+
"query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2012+
PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2013+
return;
2014+
}
2015+
2016+
TReportExecStatusParams params;
2017+
params.protocol_version = FrontendServiceVersion::V1;
2018+
params.__set_query_id(req.query_id);
2019+
params.__set_backend_num(req.backend_num);
2020+
params.__set_fragment_instance_id(req.fragment_instance_id);
2021+
params.__set_fragment_id(req.fragment_id);
2022+
params.__set_status(exec_status.to_thrift());
2023+
params.__set_done(req.done);
2024+
params.__set_query_type(req.runtime_state->query_type());
2025+
params.__isset.profile = false;
2026+
2027+
DCHECK(req.runtime_state != nullptr);
2028+
2029+
if (req.runtime_state->query_type() == TQueryType::LOAD) {
2030+
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2031+
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2032+
} else {
2033+
DCHECK(!req.runtime_states.empty());
2034+
if (!req.runtime_state->output_files().empty()) {
2035+
params.__isset.delta_urls = true;
2036+
for (auto& it : req.runtime_state->output_files()) {
2037+
params.delta_urls.push_back(_to_http_path(it));
2038+
}
2039+
}
2040+
if (!params.delta_urls.empty()) {
2041+
params.__isset.delta_urls = true;
2042+
}
2043+
}
2044+
2045+
static std::string s_dpp_normal_all = "dpp.norm.ALL";
2046+
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2047+
static std::string s_unselected_rows = "unselected.rows";
2048+
int64_t num_rows_load_success = 0;
2049+
int64_t num_rows_load_filtered = 0;
2050+
int64_t num_rows_load_unselected = 0;
2051+
if (req.runtime_state->num_rows_load_total() > 0 ||
2052+
req.runtime_state->num_rows_load_filtered() > 0 ||
2053+
req.runtime_state->num_finished_range() > 0) {
2054+
params.__isset.load_counters = true;
2055+
2056+
num_rows_load_success = req.runtime_state->num_rows_load_success();
2057+
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2058+
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2059+
params.__isset.fragment_instance_reports = true;
2060+
TFragmentInstanceReport t;
2061+
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2062+
t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2063+
t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2064+
t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2065+
params.fragment_instance_reports.push_back(t);
2066+
} else if (!req.runtime_states.empty()) {
2067+
for (auto* rs : req.runtime_states) {
2068+
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2069+
rs->num_finished_range() > 0) {
2070+
params.__isset.load_counters = true;
2071+
num_rows_load_success += rs->num_rows_load_success();
2072+
num_rows_load_filtered += rs->num_rows_load_filtered();
2073+
num_rows_load_unselected += rs->num_rows_load_unselected();
2074+
params.__isset.fragment_instance_reports = true;
2075+
TFragmentInstanceReport t;
2076+
t.__set_fragment_instance_id(rs->fragment_instance_id());
2077+
t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2078+
t.__set_loaded_rows(rs->num_rows_load_total());
2079+
t.__set_loaded_bytes(rs->num_bytes_load_total());
2080+
params.fragment_instance_reports.push_back(t);
2081+
}
2082+
}
2083+
}
2084+
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2085+
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2086+
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2087+
2088+
if (!req.load_error_url.empty()) {
2089+
params.__set_tracking_url(req.load_error_url);
2090+
}
2091+
if (!req.first_error_msg.empty()) {
2092+
params.__set_first_error_msg(req.first_error_msg);
2093+
}
2094+
for (auto* rs : req.runtime_states) {
2095+
if (rs->wal_id() > 0) {
2096+
params.__set_txn_id(rs->wal_id());
2097+
params.__set_label(rs->import_label());
2098+
}
2099+
}
2100+
if (!req.runtime_state->export_output_files().empty()) {
2101+
params.__isset.export_files = true;
2102+
params.export_files = req.runtime_state->export_output_files();
2103+
} else if (!req.runtime_states.empty()) {
2104+
for (auto* rs : req.runtime_states) {
2105+
if (!rs->export_output_files().empty()) {
2106+
params.__isset.export_files = true;
2107+
params.export_files.insert(params.export_files.end(),
2108+
rs->export_output_files().begin(),
2109+
rs->export_output_files().end());
2110+
}
2111+
}
2112+
}
2113+
if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2114+
params.__isset.commitInfos = true;
2115+
params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2116+
} else if (!req.runtime_states.empty()) {
2117+
for (auto* rs : req.runtime_states) {
2118+
if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2119+
params.__isset.commitInfos = true;
2120+
params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2121+
}
2122+
}
2123+
}
2124+
if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2125+
params.__isset.errorTabletInfos = true;
2126+
params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2127+
} else if (!req.runtime_states.empty()) {
2128+
for (auto* rs : req.runtime_states) {
2129+
if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2130+
params.__isset.errorTabletInfos = true;
2131+
params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2132+
rs_eti.end());
2133+
}
2134+
}
2135+
}
2136+
if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2137+
params.__isset.hive_partition_updates = true;
2138+
params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2139+
hpu.end());
2140+
} else if (!req.runtime_states.empty()) {
2141+
for (auto* rs : req.runtime_states) {
2142+
if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2143+
params.__isset.hive_partition_updates = true;
2144+
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2145+
rs_hpu.begin(), rs_hpu.end());
2146+
}
2147+
}
2148+
}
2149+
if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2150+
params.__isset.iceberg_commit_datas = true;
2151+
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2152+
icd.end());
2153+
} else if (!req.runtime_states.empty()) {
2154+
for (auto* rs : req.runtime_states) {
2155+
if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2156+
params.__isset.iceberg_commit_datas = true;
2157+
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2158+
rs_icd.begin(), rs_icd.end());
2159+
}
2160+
}
2161+
}
2162+
2163+
if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2164+
params.__isset.mc_commit_datas = true;
2165+
params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2166+
} else if (!req.runtime_states.empty()) {
2167+
for (auto* rs : req.runtime_states) {
2168+
if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2169+
params.__isset.mc_commit_datas = true;
2170+
params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2171+
rs_mcd.end());
2172+
}
2173+
}
2174+
}
2175+
2176+
req.runtime_state->get_unreported_errors(&(params.error_log));
2177+
params.__isset.error_log = (!params.error_log.empty());
2178+
2179+
if (_exec_env->cluster_info()->backend_id != 0) {
2180+
params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2181+
}
2182+
2183+
TReportExecStatusResult res;
2184+
Status rpc_status;
2185+
2186+
VLOG_DEBUG << "reportExecStatus params is "
2187+
<< apache::thrift::ThriftDebugString(params).c_str();
2188+
if (!exec_status.ok()) {
2189+
LOG(WARNING) << "report error status: " << exec_status.msg()
2190+
<< " to coordinator: " << req.coord_addr
2191+
<< ", query id: " << print_id(req.query_id);
2192+
}
2193+
try {
2194+
try {
2195+
(*coord)->reportExecStatus(res, params);
2196+
} catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2197+
#ifndef ADDRESS_SANITIZER
2198+
LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2199+
<< ", instance id: " << print_id(req.fragment_instance_id) << " to "
2200+
<< req.coord_addr << ", err: " << e.what();
2201+
#endif
2202+
rpc_status = coord->reopen();
2203+
2204+
if (!rpc_status.ok()) {
2205+
req.cancel_fn(rpc_status);
2206+
return;
2207+
}
2208+
(*coord)->reportExecStatus(res, params);
2209+
}
2210+
2211+
rpc_status = Status::create<false>(res.status);
2212+
} catch (apache::thrift::TException& e) {
2213+
rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2214+
PrintThriftNetworkAddress(req.coord_addr), e.what());
2215+
}
2216+
2217+
if (!rpc_status.ok()) {
2218+
LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2219+
print_id(req.query_id), rpc_status.to_string());
2220+
req.cancel_fn(rpc_status);
2221+
}
2222+
}
2223+
19692224
Status PipelineFragmentContext::send_report(bool done) {
19702225
Status exec_status = _query_ctx->exec_status();
19712226
// If plan is done successfully, but _is_report_success is false,
@@ -2017,9 +2272,14 @@ Status PipelineFragmentContext::send_report(bool done) {
20172272
.load_error_url = load_eror_url,
20182273
.first_error_msg = first_error_msg,
20192274
.cancel_fn = [this](const Status& reason) { cancel(reason); }};
2020-
2021-
return _report_status_cb(
2022-
req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2275+
auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2276+
return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2277+
SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2278+
_coordinator_callback(req);
2279+
if (!req.done) {
2280+
ctx->refresh_next_report_time();
2281+
}
2282+
});
20232283
}
20242284

20252285
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {

be/src/exec/pipeline/pipeline_fragment_context.h

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,9 @@ class Dependency;
5252
class PipelineFragmentContext : public TaskExecutionContext {
5353
public:
5454
ENABLE_FACTORY_CREATOR(PipelineFragmentContext);
55-
// Callback to report execution status of plan fragment.
56-
// 'profile' is the cumulative profile, 'done' indicates whether the execution
57-
// is done or still continuing.
58-
// Note: this does not take a const RuntimeProfile&, because it might need to call
59-
// functions like PrettyPrint() or to_thrift(), neither of which is const
60-
// because they take locks.
61-
using report_status_callback = std::function<Status(
62-
const ReportStatusRequest, std::shared_ptr<PipelineFragmentContext>&&)>;
6355
PipelineFragmentContext(TUniqueId query_id, const TPipelineFragmentParams& request,
6456
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
65-
const std::function<void(RuntimeState*, Status*)>& call_back,
66-
report_status_callback report_status_cb);
57+
const std::function<void(RuntimeState*, Status*)>& call_back);
6758

6859
~PipelineFragmentContext() override;
6960

@@ -157,6 +148,9 @@ class PipelineFragmentContext : public TaskExecutionContext {
157148
}
158149

159150
private:
151+
void _coordinator_callback(const ReportStatusRequest& req);
152+
std::string _to_http_path(const std::string& file_name) const;
153+
160154
void _release_resource();
161155

162156
Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
@@ -257,14 +251,6 @@ class PipelineFragmentContext : public TaskExecutionContext {
257251
std::atomic_bool _disable_period_report = true;
258252
std::atomic_uint64_t _previous_report_time = 0;
259253

260-
// This callback is used to notify the FE of the status of the fragment.
261-
// For example:
262-
// 1. when the fragment is cancelled, it will be called.
263-
// 2. when the fragment is finished, it will be called. especially, when the fragment is
264-
// a insert into select statement, it should notfiy FE every fragment's status.
265-
// And also, this callback is called periodly to notify FE the load process.
266-
report_status_callback _report_status_cb;
267-
268254
DescriptorTbl* _desc_tbl = nullptr;
269255
int _num_instances = 1;
270256

0 commit comments

Comments
 (0)