aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
3 files changed, 54 insertions, 9 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 416efaa75b..bc58fb2a36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -210,7 +210,8 @@ class AppClientSuite
execAddedList.add(id)
}
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
+ def executorRemoved(
+ id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
execRemovedList.add(id)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 33824749ae..6787b30261 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
@@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def beforeEach(): Unit = {
super.beforeEach()
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ init(new SparkConf())
+ }
+
+ private def init(testConf: SparkConf): Unit = {
+ sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
@@ -621,6 +626,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
+ private val shuffleFileLossTests = Seq(
+ ("slave lost with shuffle service", SlaveLost("", false), true, false),
+ ("worker lost with shuffle service", SlaveLost("", true), true, true),
+ ("worker lost without shuffle service", SlaveLost("", true), false, true),
+ ("executor failure with shuffle service", ExecutorKilled, true, false),
+ ("executor failure without shuffle service", ExecutorKilled, false, true))
+
+ for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
+ val maybeLost = if (expectFileLoss) {
+ "lost"
+ } else {
+ "not lost"
+ }
+ test(s"shuffle files $maybeLost when $eventDescription") {
+ // reset the test context with the right shuffle service config
+ afterEach()
+ val conf = new SparkConf()
+ conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+ init(conf)
+ assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+ submit(reduceRdd, Array(0))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
+ runEvent(ExecutorLost("exec-hostA", event))
+ if (expectFileLoss) {
+ intercept[MetadataFetchFailedException] {
+ mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
+ }
+ } else {
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ }
+ }
+ }
// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
@@ -628,7 +673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(stageAttempt.stageAttemptId == attempt)
}
-
// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
@@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
@@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
))
// then one executor dies, and a task fails in stage 1
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
@@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
makeMapStatus("hostA", reduceRdd.partitions.length)))
// now that host goes down
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// so we resubmit those tasks
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
@@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
@@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// Pretend host A was lost
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 36d1c5690f..7d6ad08036 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
override def executorAdded(execId: String, host: String) {}
- override def executorLost(execId: String) {}
+ override def executorLost(execId: String, reason: ExecutorLossReason) {}
override def taskSetFailed(
taskSet: TaskSet,