aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-12-03 16:28:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-12-03 16:29:00 -0800
commitfe28ee2d13e0799120136419deec094752d2a370 (patch)
treecfacb969f53249c725ac00b083a4be9632f14b24
parent6b6b7791d544376f8010b20e839c1627a71c69cb (diff)
downloadspark-fe28ee2d13e0799120136419deec094752d2a370.tar.gz
spark-fe28ee2d13e0799120136419deec094752d2a370.tar.bz2
spark-fe28ee2d13e0799120136419deec094752d2a370.zip
[SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.
cc aarondav kayousterhout pwendell This should go into 1.2? Author: Reynold Xin <rxin@databricks.com> Closes #3579 from rxin/SPARK-4085 and squashes the following commits: 255b4fd [Reynold Xin] Updated test. f9814d9 [Reynold Xin] Code review feedback. 2afaf35 [Reynold Xin] [SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file. (cherry picked from commit 1826372d0a1bc80db9015106dd5d2d155ada33f5) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala23
3 files changed, 40 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 83170f7c5a..2499c11a65 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -17,6 +17,7 @@
package org.apache.spark.storage
+import java.io.{InputStream, IOException}
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
@@ -289,17 +290,22 @@ final class ShuffleBlockFetcherIterator(
}
val iteratorTry: Try[Iterator[Any]] = result match {
- case FailureFetchResult(_, e) => Failure(e)
- case SuccessFetchResult(blockId, _, buf) => {
- val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
- val iter = serializer.newInstance().deserializeStream(is).asIterator
- Success(CompletionIterator[Any, Iterator[Any]](iter, {
- // Once the iterator is exhausted, release the buffer and set currentResult to null
- // so we don't release it again in cleanup.
- currentResult = null
- buf.release()
- }))
- }
+ case FailureFetchResult(_, e) =>
+ Failure(e)
+ case SuccessFetchResult(blockId, _, buf) =>
+ // There is a chance that createInputStream can fail (e.g. fetching a local file that does
+ // not exist, SPARK-4085). In that case, we should propagate the right exception so
+ // the scheduler gets a FetchFailedException.
+ Try(buf.createInputStream()).map { is0 =>
+ val is = blockManager.wrapForCompression(blockId, is0)
+ val iter = serializer.newInstance().deserializeStream(is).asIterator
+ CompletionIterator[Any, Iterator[Any]](iter, {
+ // Once the iterator is exhausted, release the buffer and set currentResult to null
+ // so we don't release it again in cleanup.
+ currentResult = null
+ buf.release()
+ })
+ }
}
(result.blockId, iteratorTry)
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 55799f5514..a66fa2195a 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark
-import java.util.concurrent.atomic.AtomicInteger
-
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkContext._
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 5d20b4dc15..d8e4765edf 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId}
import org.apache.spark.util.MutablePair
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
@@ -264,6 +265,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
}
}
}
+
+ test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
+ val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+ sc = new SparkContext("local", "test", myConf)
+ val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
+ rdd.count()
+
+ // Delete one of the local shuffle blocks.
+ val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
+ val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
+ assert(hashFile.exists() || sortFile.exists())
+
+ if (hashFile.exists()) {
+ hashFile.delete()
+ }
+ if (sortFile.exists()) {
+ sortFile.delete()
+ }
+
+ // This count should retry the execution of the previous stage and rerun shuffle.
+ rdd.count()
+ }
}
object ShuffleSuite {