aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-09-06 16:55:22 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-06 16:55:22 -0700
commitc07cbb3534a57834b9b78e1572d40fb2af930f5f (patch)
tree58f070a2d3d8f41591867fa3db0aff858da2d8e9
parent175b4344112b376cbbbd05265125ed0e1b87d507 (diff)
downloadspark-c07cbb3534a57834b9b78e1572d40fb2af930f5f.tar.gz
spark-c07cbb3534a57834b9b78e1572d40fb2af930f5f.tar.bz2
spark-c07cbb3534a57834b9b78e1572d40fb2af930f5f.zip
[SPARK-17371] Resubmitted shuffle outputs can get deleted by zombie map tasks
## What changes were proposed in this pull request? It seems that old shuffle map tasks hanging around after a stage resubmit will delete intended shuffle output files on stop(), causing downstream stages to fail even after successful resubmit completion. This can happen easily if the prior map task is waiting for a network timeout when its stage is resubmitted. This can cause unnecessary stage resubmits, sometimes multiple times as fetch fails cause a cascade of shuffle file invalidations, and confusing FetchFailure messages that report shuffle index files missing from the local disk. Given that IndexShuffleBlockResolver commits data atomically, it seems unnecessary to ever delete committed task output: even in the rare case that a task is failed after it finishes committing shuffle output, it should be safe to retain that output. ## How was this patch tested? Prior to the fix proposed in https://github.com/apache/spark/pull/14931, I was able to reproduce this behavior by killing slaves in the middle of a large shuffle. After this patch, stages were no longer resubmitted multiple times due to shuffle index loss. cc JoshRosen vanzin Author: Eric Liang <ekl@databricks.com> Closes #14932 from ericl/dont-remove-committed-files.
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java1
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala2
3 files changed, 0 insertions, 5 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index cd6d64a1ee..0fcc56d50a 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -238,7 +238,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
partitionWriters = null;
}
}
- shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
return None$.empty();
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 876cb7f7d8..63d376b44f 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -465,8 +465,6 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
}
return Option.apply(mapStatus);
} else {
- // The map task failed, so delete our output data.
- shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
return Option.apply(null);
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 1adacabc86..cc01e6aa7e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -83,8 +83,6 @@ private[spark] class SortShuffleWriter[K, V, C](
if (success) {
return Option(mapStatus)
} else {
- // The map task failed, so delete our output data.
- shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {