@@ -1021,13 +1021,13 @@ public long getId() {
10211021
10221022 @ Override
10231023 public void beforeCommitted (TransactionState txnState ) throws TransactionException {
1024- boolean shouldReleaseLock = false ;
10251024 writeLock ();
1025+ boolean passCheck = false ;
10261026 try {
10271027 if (runningStreamTask .getIsCanceled ().get ()) {
1028- log . info ("streaming insert job {} task {} is canceled, skip beforeCommitted" ,
1029- getJobId (), runningStreamTask .getTaskId ());
1030- return ;
1028+ throw new TransactionException ("streaming insert job " + getJobId ()
1029+ + " task " + runningStreamTask .getTaskId ()
1030+ + " is canceled, txn " + txnState . getTransactionId () + " could not be committed" ) ;
10311031 }
10321032
10331033 ArrayList <Long > taskIds = new ArrayList <>();
@@ -1046,7 +1046,6 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
10461046 runningStreamTask .getTaskId (),
10471047 runningStreamTask .getScanBackendIds ());
10481048
1049-
10501049 if (StringUtils .isBlank (offsetJson )) {
10511050 throw new TransactionException ("Cannot find offset for attachment, load job id is "
10521051 + runningStreamTask .getTaskId ());
@@ -1059,8 +1058,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
10591058 loadStatistic .getFileNumber (),
10601059 loadStatistic .getTotalFileSizeB (),
10611060 offsetJson ));
1061+ passCheck = true ;
10621062 } finally {
1063- if (shouldReleaseLock ) {
1063+ if (! passCheck ) {
10641064 writeUnlock ();
10651065 }
10661066 }
0 commit comments