aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2015-11-10 16:50:22 -0800
committerAndrew Or <andrew@databricks.com>2015-11-10 16:50:22 -0800
commit33112f9c48680c33d663978f76806ebf0ea39789 (patch)
tree73a6beb5668ee20752189a026e035a76ab0f15c8
parentc0e48dfa611fa5d94132af7e6f6731f60ab833da (diff)
downloadspark-33112f9c48680c33d663978f76806ebf0ea39789.tar.gz
spark-33112f9c48680c33d663978f76806ebf0ea39789.tar.bz2
spark-33112f9c48680c33d663978f76806ebf0ea39789.zip
[SPARK-10192][CORE] simple test w/ failure involving a shared dependency
just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809 copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first Author: Imran Rashid <irashid@cloudera.com> Closes #8402 from squito/test_retry_in_shared_shuffle_dep.
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala51
1 files changed, 49 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3816b8c4a0..068b49bd58 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,11 +594,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
- private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
+ private def completeNextResultStageWithSuccess(
+ stageId: Int,
+ attemptIdx: Int,
+ partitionToResult: Int => Int = _ => 42): Unit = {
val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
- complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
+ val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+ (Success, partitionToResult(idx))
+ }
+ complete(stageAttempt, taskResults.toSeq)
}
/**
@@ -1055,6 +1061,47 @@ class DAGSchedulerSuite
}
/**
+ * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
+ * requires regenerating some outputs of the shared dependency. One key aspect of this test is
+ * that the second job actually uses a different stage for the shared dependency (a "skipped"
+ * stage).
+ */
+ test("shuffle fetch failure in a reused shuffle dependency") {
+ // Run the first job successfully, which creates one shuffle dependency
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduceRdd, Array(0, 1))
+
+ completeShuffleMapStageSuccessfully(0, 0, 2)
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42, 1 -> 42))
+ assertDataStructuresEmpty()
+
+ // submit another job w/ the shared dependency, and have a fetch failure
+ val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduce2, Array(0, 1))
+ // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
+ // stage. If instead it reused the existing stage, then this would be stage 2
+ completeNextStageWithFetchFailure(3, 0, shuffleDep)
+ scheduler.resubmitFailedStages()
+
+ // the scheduler now creates a new task set to regenerate the missing map output, but this time
+ // using a different stage, the "skipped" one
+
+ // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
+ // the shuffle map output is still available from stage 0); make sure we've still got internal
+ // accumulators setup
+ assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+ completeShuffleMapStageSuccessfully(2, 0, 2)
+ completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+ assert(results === Map(0 -> 1234, 1 -> 1235))
+
+ assertDataStructuresEmpty()
+ }
+
+ /**
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
* have completions from both the first & second attempt of stage 1. So all the map output is
* available before we finish any task set for stage 1. We want to make sure that we don't