aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-common/pom.xml4
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java49
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java24
3 files changed, 61 insertions, 16 deletions
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 5444ae6d70..12d89273d7 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -41,6 +41,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<!-- Provided dependencies -->
<dependency>
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index fbed2f053d..eb534eed24 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,14 +80,32 @@ public class JavaUtils {
return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
}
- /*
+ /**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
- * Throws an exception if deletion is unsuccessful.
+ *
+ * @param file Input file / dir to be deleted
+ * @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
if (file == null) { return; }
+ // On Unix systems, use operating system command to run faster
+ // If that does not work out, fallback to the Java IO way
+ if (SystemUtils.IS_OS_UNIX) {
+ try {
+ deleteRecursivelyUsingUnixNative(file);
+ return;
+ } catch (IOException e) {
+ logger.warn("Attempt to delete using native Unix OS command failed for path = {}. " +
+ "Falling back to Java IO way", file.getAbsolutePath(), e);
+ }
+ }
+
+ deleteRecursivelyUsingJavaIO(file);
+ }
+
+ private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
@@ -109,6 +128,32 @@ public class JavaUtils {
}
}
+ private static void deleteRecursivelyUsingUnixNative(File file) throws IOException {
+ ProcessBuilder builder = new ProcessBuilder("rm", "-rf", file.getAbsolutePath());
+ Process process = null;
+ int exitCode = -1;
+
+ try {
+ // In order to avoid deadlocks, consume the stdout (and stderr) of the process
+ builder.redirectErrorStream(true);
+ builder.redirectOutput(new File("/dev/null"));
+
+ process = builder.start();
+
+ exitCode = process.waitFor();
+ } catch (Exception e) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath(), e);
+ } finally {
+ if (process != null && process.isAlive()) {
+ process.destroy();
+ }
+ }
+
+ if (exitCode != 0 || file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath());
+ }
+ }
+
private static File[] listFilesSafely(File file) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 62a1fb42b0..81e01949e5 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -27,12 +27,17 @@ import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Manages some sort-shuffle data, including the creation
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
*/
public class TestShuffleDataContext {
+ private static final Logger logger = LoggerFactory.getLogger(TestShuffleDataContext.class);
+
public final String[] localDirs;
public final int subDirsPerLocalDir;
@@ -53,7 +58,11 @@ public class TestShuffleDataContext {
public void cleanup() {
for (String localDir : localDirs) {
- deleteRecursively(new File(localDir));
+ try {
+ JavaUtils.deleteRecursively(new File(localDir));
+ } catch (IOException e) {
+ logger.warn("Unable to cleanup localDir = " + localDir, e);
+ }
}
}
@@ -92,17 +101,4 @@ public class TestShuffleDataContext {
public ExecutorShuffleInfo createExecutorInfo(String shuffleManager) {
return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
}
-
- private static void deleteRecursively(File f) {
- assert f != null;
- if (f.isDirectory()) {
- File[] children = f.listFiles();
- if (children != null) {
- for (File child : children) {
- deleteRecursively(child);
- }
- }
- }
- f.delete();
- }
}