Skip to content

Commit 925840d

Browse files
branch-4.1: [fix](insert) fix INSERT job statistics lost in show load after FE restart #62331 (#62546)
Cherry-picked from #62331 Co-authored-by: hui lai <laihui@selectdb.com>
1 parent 26e0115 commit 925840d

File tree

3 files changed

+116
-1
lines changed

3 files changed

+116
-1
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public class InsertLoadJob extends LoadJob {
4848
@SerializedName("tid")
4949
private long tableId;
5050

51+
// Snapshot of loadStatistic.toJson() captured when the job finishes.
52+
// loadStatistic is not persisted (no @SerializedName), so we save it here
53+
// to survive FE restarts.
54+
@SerializedName("jdj")
55+
private String jobDetailsJson = null;
56+
5157
// only for log replay
5258
public InsertLoadJob() {
5359
super(EtlJobType.INSERT);
@@ -91,6 +97,9 @@ public void setJobProperties(long transactionId, long tableId, long createTimest
9197
this.loadingStatus.setTrackingUrl(trackingUrl);
9298
this.loadingStatus.setFirstErrorMsg(firstErrorMsg);
9399
this.userInfo = userInfo;
100+
// Snapshot the current loadStatistic so it survives FE restarts.
101+
// loadStatistic itself is not annotated with @SerializedName and won't be persisted.
102+
this.jobDetailsJson = this.loadStatistic.toJson();
94103
}
95104

96105
public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
@@ -116,4 +125,14 @@ public Set<String> getTableNames() throws MetaNotFoundException {
116125
throw e;
117126
}
118127
}
128+
129+
@Override
130+
protected String getJobDetailsJson() {
131+
// Use the persisted snapshot when loadStatistic is empty (e.g. after FE restart).
132+
// Fall back to the live loadStatistic during execution.
133+
if (jobDetailsJson != null) {
134+
return jobDetailsJson;
135+
}
136+
return loadStatistic.toJson();
137+
}
119138
}

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ protected List<Comparable> getShowInfoUnderLock() throws DdlException {
790790
jobInfo.add(TimeUtils.longToTimeString(finishTimestamp));
791791
// tracking url
792792
jobInfo.add(loadingStatus.getTrackingUrl());
793-
jobInfo.add(loadStatistic.toJson());
793+
jobInfo.add(getJobDetailsJson());
794794
// transaction id
795795
jobInfo.add(transactionId);
796796
// error tablets
@@ -820,6 +820,10 @@ public String getResourceName() {
820820
return "N/A";
821821
}
822822

823+
protected String getJobDetailsJson() {
824+
return loadStatistic.toJson();
825+
}
826+
823827
protected long getEtlStartTimestamp() {
824828
return loadStartTimestamp;
825829
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
20+
// Verify that INSERT job statistics (ScannedRows, LoadBytes, etc.) shown in
21+
// SHOW LOAD are preserved after a FE restart. Before the fix, loadStatistic
22+
// was not serialized to the edit log, so all counters reset to 0 after restart.
23+
suite("test_insert_statistic_after_fe_restart", "docker") {
24+
def options = new ClusterOptions()
25+
options.setFeNum(1)
26+
docker(options) {
27+
def dbName = "test_insert_statistic_restart_db"
28+
def srcTbl = "src_tbl"
29+
def dstTbl = "dst_tbl"
30+
31+
sql """DROP DATABASE IF EXISTS ${dbName}"""
32+
sql """CREATE DATABASE ${dbName}"""
33+
sql """USE ${dbName}"""
34+
35+
sql """
36+
CREATE TABLE ${srcTbl} (
37+
k1 INT NULL,
38+
k2 VARCHAR(50) NULL
39+
) ENGINE=OLAP
40+
DUPLICATE KEY(k1)
41+
DISTRIBUTED BY HASH(k1) BUCKETS 3
42+
PROPERTIES ("replication_num" = "1")
43+
"""
44+
sql """
45+
CREATE TABLE ${dstTbl} (
46+
k1 INT NULL,
47+
k2 VARCHAR(50) NULL
48+
) ENGINE=OLAP
49+
DUPLICATE KEY(k1)
50+
DISTRIBUTED BY HASH(k1) BUCKETS 3
51+
PROPERTIES ("replication_num" = "1")
52+
"""
53+
54+
// Insert enough rows so ScannedRows and LoadBytes are clearly non-zero
55+
sql """INSERT INTO ${srcTbl} SELECT number, concat('value_', number)
56+
FROM numbers('number'='1000')"""
57+
58+
// insert into select — this creates the INSERT load job tracked by show load
59+
sql """INSERT INTO ${dstTbl} SELECT * FROM ${srcTbl}"""
60+
61+
def result = sql """SHOW LOAD FROM ${dbName}"""
62+
assertEquals(1, result.size())
63+
def jobDetailsBefore = parseJson(result[0][14])
64+
logger.info("JobDetails before restart: ${result[0][14]}")
65+
66+
assertTrue(jobDetailsBefore.ScannedRows > 0,
67+
"ScannedRows should be > 0 before restart, got ${jobDetailsBefore.ScannedRows}")
68+
assertTrue(jobDetailsBefore.LoadBytes > 0,
69+
"LoadBytes should be > 0 before restart, got ${jobDetailsBefore.LoadBytes}")
70+
71+
// Restart FE and reconnect
72+
cluster.restartFrontends()
73+
sleep(30000)
74+
context.reconnectFe()
75+
76+
sql """USE ${dbName}"""
77+
78+
result = sql """SHOW LOAD FROM ${dbName}"""
79+
assertEquals(1, result.size())
80+
def jobDetailsAfter = parseJson(result[0][14])
81+
logger.info("JobDetails after restart: ${result[0][14]}")
82+
83+
assertEquals(jobDetailsBefore.ScannedRows, jobDetailsAfter.ScannedRows,
84+
"ScannedRows changed after FE restart: before=${jobDetailsBefore.ScannedRows}, after=${jobDetailsAfter.ScannedRows}")
85+
assertEquals(jobDetailsBefore.LoadBytes, jobDetailsAfter.LoadBytes,
86+
"LoadBytes changed after FE restart: before=${jobDetailsBefore.LoadBytes}, after=${jobDetailsAfter.LoadBytes}")
87+
assertEquals(jobDetailsBefore.FileNumber, jobDetailsAfter.FileNumber,
88+
"FileNumber changed after FE restart")
89+
assertEquals(jobDetailsBefore.FileSize, jobDetailsAfter.FileSize,
90+
"FileSize changed after FE restart")
91+
}
92+
}

0 commit comments

Comments
 (0)