diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 5cce3a3a8b..a399127dd9 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -227,7 +227,7 @@ public void registerShuffle( taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber); try { long start = System.currentTimeMillis(); - shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId); + shuffleServer.getShuffleTaskManager().quickRemoveShuffleDataSync(appId, shuffleId); LOG.info( "Deleted the previous stage attempt data due to stage recomputing for app: {}, " + "shuffleId: {}. It costs {} ms", diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index b482584236..3d4e94478c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -779,6 +779,16 @@ public boolean isAppExpired(String appId) { * @param shuffleIds */ public void removeResourcesByShuffleIds(String appId, List shuffleIds) { + removeResourcesByShuffleIds(appId, shuffleIds, false); + } + + /** + * Clear up the partial resources of shuffleIds of App. + * + * @param appId + * @param shuffleIds + */ + public void removeResourcesByShuffleIds(String appId, List shuffleIds, boolean isQuick) { Lock writeLock = getAppWriteLock(appId); writeLock.lock(); try { @@ -811,7 +821,7 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) withTimeoutExecution( () -> { storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds), isQuick); return null; }, storageRemoveOperationTimeoutSec, @@ -998,6 +1008,16 @@ public void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); } + /** + * Delete all data under the shuffleId using the synchronous quick delete mode. + * + * @param appId + * @param shuffleId + */ + public void quickRemoveShuffleDataSync(String appId, int shuffleId) { + removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true); + } + public ShuffleDataDistributionType getDataDistributionType(String appId) { return shuffleTaskInfos.get(appId).getDataDistType(); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 99c055e5fa..9f075d8e1e 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -21,11 +21,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,6 +51,7 @@ import org.apache.uniffle.storage.common.HadoopStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; +import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent; import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; import org.apache.uniffle.storage.util.ShuffleStorageUtils; @@ -65,12 +68,48 @@ public class HadoopStorageManager extends SingleStorageManager { private Map appIdToStorages = JavaUtils.newConcurrentMap(); private Map pathToStorages = JavaUtils.newConcurrentMap(); private final boolean isStorageAuditLogEnabled; + private final BlockingQueue quickNeedDeletePaths = + Queues.newLinkedBlockingQueue(); + private Thread clearNeedDeleteLocalPathThread; HadoopStorageManager(ShuffleServerConf conf) { super(conf); hadoopConf = conf.getHadoopConf(); shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId"); isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED); + Runnable clearNeedDeletePathTask = + () -> { + while (true) { + AsynchronousDeleteEvent asynchronousDeleteEvent = null; + try { + asynchronousDeleteEvent = quickNeedDeletePaths.take(); + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.HDFS.name(), + asynchronousDeleteEvent.getConf(), + shuffleServerId)); + deleteHandler.delete( + asynchronousDeleteEvent.getNeedDeleteRenamePaths(), + asynchronousDeleteEvent.getAppId(), + asynchronousDeleteEvent.getUser()); + + } catch (Exception e) { + if (asynchronousDeleteEvent != null) { + LOG.error( + "Delete Path {} failed.", + asynchronousDeleteEvent.getNeedDeleteRenamePaths(), + e); + } else { + LOG.error("Failed to delete a directory in clearNeedDeleteHadoopPathThread.", e); + } + } + } + }; + clearNeedDeleteLocalPathThread = new Thread(clearNeedDeletePathTask); + clearNeedDeleteLocalPathThread.setName("clearNeedDeleteHadoopPathThread"); + clearNeedDeleteLocalPathThread.setDaemon(true); } @Override @@ -99,6 +138,11 @@ public Storage selectStorage(ShuffleDataReadEvent event) { @Override public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + @Override + public void removeResources(PurgeEvent event, boolean isQuick) { String appId = event.getAppId(); HadoopStorage storage = getStorageByAppId(appId); if (storage != null) { @@ -149,7 +193,19 @@ public void removeResources(PurgeEvent event) { storage.getStoragePath())); } } - deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + if (isQuick) { + AsynchronousDeleteEvent asynchronousDeleteEvent = + new AsynchronousDeleteEvent( + appId, event.getUser(), storage.getConf(), event.getShuffleIds(), deletePaths); + deleteHandler.quickDelete(asynchronousDeleteEvent); + boolean isSucess = quickNeedDeletePaths.offer(asynchronousDeleteEvent); + if (!isSucess) { + LOG.warn( + "Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements."); + } + } else { + deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + } removeAppStorageInfo(event); } else { LOG.warn("Storage gotten is null when removing resources for event: {}", event); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java index 05b83e558f..3a7afc0efc 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java @@ -158,9 +158,13 @@ public Map getStorageInfo() { } public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + public void removeResources(PurgeEvent event, boolean isQuick) { LOG.info("Start to remove resource of {}", event); - warmStorageManager.removeResources(event); - coldStorageManager.removeResources(event); + warmStorageManager.removeResources(event, isQuick); + coldStorageManager.removeResources(event, isQuick); } public StorageManager getColdStorageManager() { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 6be2af7406..c045c6f034 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -41,6 +42,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -69,6 +71,7 @@ import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageMediaProvider; import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; +import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent; import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; import org.apache.uniffle.storage.util.ShuffleStorageUtils; @@ -91,6 +94,9 @@ public class LocalStorageManager extends SingleStorageManager { private final List typeProviders = Lists.newArrayList(); private final boolean isStorageAuditLogEnabled; + private final BlockingQueue quickNeedDeletePaths = + Queues.newLinkedBlockingQueue(); + private Thread clearNeedDeleteLocalPathThread; @VisibleForTesting LocalStorageManager(ShuffleServerConf conf) { @@ -176,6 +182,37 @@ public class LocalStorageManager extends SingleStorageManager { localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))); this.checker = new LocalStorageChecker(conf, localStorages); isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED); + + Runnable clearNeedDeletePathTask = + () -> { + while (true) { + AsynchronousDeleteEvent asynchronousDeleteEvent = null; + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.LOCALFILE.name(), new Configuration())); + try { + asynchronousDeleteEvent = quickNeedDeletePaths.take(); + deleteHandler.delete( + asynchronousDeleteEvent.getNeedDeleteRenamePaths(), + asynchronousDeleteEvent.getAppId(), + asynchronousDeleteEvent.getUser()); + } catch (Exception e) { + if (asynchronousDeleteEvent != null) { + LOG.error( + "Delete Path {} failed.", + asynchronousDeleteEvent.getNeedDeleteRenamePaths(), + e); + } else { + LOG.error("Failed to delete a directory in clearNeedDeleteHadoopPathThread.", e); + } + } + } + }; + clearNeedDeleteLocalPathThread = new Thread(clearNeedDeletePathTask); + clearNeedDeleteLocalPathThread.setName("clearNeedDeleteLocalPathThread"); + clearNeedDeleteLocalPathThread.setDaemon(true); } private StorageMedia getStorageTypeForBasePath(String basePath) { @@ -267,6 +304,11 @@ public Checker getStorageChecker() { @Override public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + @Override + public void removeResources(PurgeEvent event, boolean isQuick) { String appId = event.getAppId(); String user = event.getUser(); List shuffleSet = @@ -328,8 +370,19 @@ public void removeResources(PurgeEvent event) { } }) .collect(Collectors.toList()); - - deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + if (isQuick) { + AsynchronousDeleteEvent asynchronousDeleteEvent = + new AsynchronousDeleteEvent( + appId, event.getUser(), null, event.getShuffleIds(), deletePaths); + deleteHandler.quickDelete(asynchronousDeleteEvent); + boolean isSucess = quickNeedDeletePaths.offer(asynchronousDeleteEvent); + if (!isSucess) { + LOG.warn( + "Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements."); + } + } else { + deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + } removeAppStorageInfo(event); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 70425a22d8..6bbf4f6639 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -44,6 +44,8 @@ public interface StorageManager { void removeResources(PurgeEvent event); + void removeResources(PurgeEvent event, boolean isQuick); + void start(); void stop(); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/AsynchronousDeleteEvent.java b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynchronousDeleteEvent.java new file mode 100644 index 0000000000..74a2e107f2 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynchronousDeleteEvent.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +public class AsynchronousDeleteEvent { + private static final String TEMPORARYSUFFIX = "_tmp"; + private String appId; + private String user; + private List shuffleIds; + private Configuration conf; + /** Records the mapping between the path to be deleted and the path to be renamed. */ + private Map needDeletePathAndRenamePath; + + public AsynchronousDeleteEvent( + String appId, + String user, + Configuration conf, + List shuffleIds, + List needDeletePath) { + this.appId = appId; + this.user = user; + this.shuffleIds = shuffleIds; + this.conf = conf; + this.needDeletePathAndRenamePath = + needDeletePath.stream() + .collect( + Collectors.toMap(Function.identity(), s -> StringUtils.join(s, TEMPORARYSUFFIX))); + } + + public String getAppId() { + return appId; + } + + public String getUser() { + return user; + } + + public List getShuffleIds() { + return shuffleIds; + } + + public Configuration getConf() { + return conf; + } + + public Map getNeedDeletePathAndRenamePath() { + return needDeletePathAndRenamePath; + } + + public String[] getNeedDeleteRenamePaths() { + return needDeletePathAndRenamePath.values().stream().toArray(String[]::new); + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java index c0b32b9432..23278a8e7e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java @@ -17,6 +17,8 @@ package org.apache.uniffle.storage.handler.api; +import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent; + public interface ShuffleDeleteHandler { /** @@ -25,4 +27,7 @@ public interface ShuffleDeleteHandler { * @param appId ApplicationId for delete */ void delete(String[] storageBasePaths, String appId, String user); + + /** Rename the file and then delete it asynchronously. */ + void quickDelete(AsynchronousDeleteEvent shuffleQuickPurgeEvent); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java index 312f052f9a..8a3119939e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java @@ -19,6 +19,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; +import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent; import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; public class HadoopShuffleDeleteHandler implements ShuffleDeleteHandler { @@ -118,4 +120,55 @@ private void delete(FileSystem fileSystem, Path path, String filePrefix) throws fileSystem.delete(path, true); } } + + @Override + public void quickDelete(AsynchronousDeleteEvent asynchronousDeleteEvent) { + String appId = asynchronousDeleteEvent.getAppId(); + String user = asynchronousDeleteEvent.getUser(); + for (Map.Entry appIdNeedDeletePaths : + asynchronousDeleteEvent.getNeedDeletePathAndRenamePath().entrySet()) { + final Path path = new Path(appIdNeedDeletePaths.getKey()); + final Path breakdownPathFolder = new Path(appIdNeedDeletePaths.getValue()); + boolean isSuccess = false; + int times = 0; + int retryMax = 5; + long start = System.currentTimeMillis(); + LOG.info( + "Try rename shuffle data in Hadoop FS for appId[{}] of user[{}] with {}", + appId, + user, + path); + while (!isSuccess && times < retryMax) { + try { + FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf); + isSuccess = fileSystem.rename(path, breakdownPathFolder); + } catch (Exception e) { + if (e instanceof FileNotFoundException) { + LOG.info("[{}] doesn't exist, ignore it.", path); + return; + } + times++; + LOG.warn("Can't rename shuffle data for appId[{}] with {} times", appId, times, e); + try { + Thread.sleep(1000); + } catch (Exception ex) { + LOG.warn("Exception happened when Thread.sleep", ex); + } + } + } + if (isSuccess) { + LOG.info( + "Rename shuffle data in Hadoop FS for appId[{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } else { + LOG.warn( + "Failed to rename shuffle data in Hadoop FS for appId [{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } + } + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java index 277fb18d7a..fcb00804de 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java @@ -18,11 +18,13 @@ package org.apache.uniffle.storage.handler.impl; import java.io.File; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent; import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; public class LocalFileDeleteHandler implements ShuffleDeleteHandler { @@ -50,4 +52,42 @@ public void delete(String[] shuffleDataStoredPath, String appId, String user) { } } } + + @Override + public void quickDelete(AsynchronousDeleteEvent asynchronousDeleteEvent) { + + for (Map.Entry appIdNeedDeletePaths : + asynchronousDeleteEvent.getNeedDeletePathAndRenamePath().entrySet()) { + String appId = appIdNeedDeletePaths.getKey(); + String shufflePath = appIdNeedDeletePaths.getKey(); + String breakdownShufflePath = appIdNeedDeletePaths.getValue(); + long start = System.currentTimeMillis(); + try { + File baseFolder = new File(shufflePath); + File breakdownBaseFolder = new File(breakdownShufflePath); + boolean isSuccess = baseFolder.renameTo(breakdownBaseFolder); + if (isSuccess) { + LOG.info( + "Rename shuffle data for appId[{}] with {} to {} cost {} ms", + appId, + shufflePath, + breakdownShufflePath, + (System.currentTimeMillis() - start)); + } else { + LOG.warn( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath); + } + } catch (Exception e) { + LOG.error( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath, + e); + } + } + } }