diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 163830e447515f..a4dcd877ac3f73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1660,6 +1660,14 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState if (idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState) == null) { runningTxnNums++; } + if (isReplay && transactionState.getSubTxnIds() != null) { + LOG.info("add sub transactions for txn_id={}, status={}, sub_txn_ids={}", + transactionState.getTransactionId(), transactionState.getTransactionStatus(), + transactionState.getSubTxnIds()); + for (Long subTxnId : transactionState.getSubTxnIds()) { + addSubTransaction(transactionState.getTransactionId(), subTxnId); + } + } } else { if (idToRunningTransactionState.remove(transactionState.getTransactionId()) != null) { runningTxnNums--; @@ -1670,6 +1678,11 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState } else { finalStatusTransactionStateDequeLong.add(transactionState); } + if (transactionState.getSubTxnIds() != null) { + LOG.info("clean sub transactions for txn_id={}, sub_txn_ids={}", transactionState.getTransactionId(), + transactionState.getSubTxnIds()); + cleanSubTransactions(transactionState.getTransactionId()); + } } updateTxnLabels(transactionState); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index a189aba68116b8..82c79e9f6f00de 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -357,13 +357,13 @@ public void testSubTransaction() throws UserException { long subTransactionId3 = transactionState6.getSubTxnIds().get(2); TransactionState subTransactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, subTransactionId3); - Assert.assertEquals(transactionState6, subTransactionState); + Assert.assertEquals(null, subTransactionState); // finished txn will remove sub txn map // test show transaction state command - List> singleTranInfos = masterDbTransMgr.getSingleTranInfo(CatalogTestUtil.testDbId1, + /*List> singleTranInfos = masterDbTransMgr.getSingleTranInfo(CatalogTestUtil.testDbId1, subTransactionId3); Assert.assertEquals(1, singleTranInfos.size()); List txnInfo = singleTranInfos.get(0); - Assert.assertEquals(String.valueOf(transactionId6), txnInfo.get(0)); + Assert.assertEquals(String.valueOf(transactionId6), txnInfo.get(0));*/ // test get table transaction info: table_id to partition_id map List> tableTransInfos = masterDbTransMgr.getTableTransInfo(transactionId6); diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy index bd478bc2359959..9a743bfc9859ba 100644 --- a/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy @@ -44,6 +44,7 @@ suite("txn_insert_restart_fe", 'docker') { options.feConfigs.add('sys_log_verbose_modules=org.apache.doris') // options.beConfigs.add('sys_log_verbose_modules=*') options.beConfigs.add('enable_java_support=false') + options.beConfigs.add('pending_data_expire_time_sec=1') docker(options) { // ---------- test restart fe ---------- def result = sql 'SELECT DATABASE()'