aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-09-07 12:33:50 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-07 12:33:50 -0700
commit649fa4bf1d6fc9271ae56b6891bc93ebf57858d1 (patch)
tree90abb06091264b81769757b18a84e710a2ea7989 /core/src/test/scala/org/apache
parent76ad89e9241fb2dece95dd445661dd95ee4ef699 (diff)
downloadspark-649fa4bf1d6fc9271ae56b6891bc93ebf57858d1.tar.gz
spark-649fa4bf1d6fc9271ae56b6891bc93ebf57858d1.tar.bz2
spark-649fa4bf1d6fc9271ae56b6891bc93ebf57858d1.zip
[SPARK-17370] Shuffle service files not invalidated when a slave is lost
## What changes were proposed in this pull request? DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime. However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files. The proposed fix is to also invalidate shuffle files when an executor is lost due to a `SlaveLost` event. ## How was this patch tested? Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected. cc mateiz Author: Eric Liang <ekl@databricks.com> Closes #14931 from ericl/sc-4439.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
3 files changed, 54 insertions, 9 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 416efaa75b..bc58fb2a36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -210,7 +210,8 @@ class AppClientSuite
execAddedList.add(id)
}
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
+ def executorRemoved(
+ id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
execRemovedList.add(id)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 33824749ae..6787b30261 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
@@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def beforeEach(): Unit = {
super.beforeEach()
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ init(new SparkConf())
+ }
+
+ private def init(testConf: SparkConf): Unit = {
+ sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
@@ -621,6 +626,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
+ private val shuffleFileLossTests = Seq(
+ ("slave lost with shuffle service", SlaveLost("", false), true, false),
+ ("worker lost with shuffle service", SlaveLost("", true), true, true),
+ ("worker lost without shuffle service", SlaveLost("", true), false, true),
+ ("executor failure with shuffle service", ExecutorKilled, true, false),
+ ("executor failure without shuffle service", ExecutorKilled, false, true))
+
+ for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
+ val maybeLost = if (expectFileLoss) {
+ "lost"
+ } else {
+ "not lost"
+ }
+ test(s"shuffle files $maybeLost when $eventDescription") {
+ // reset the test context with the right shuffle service config
+ afterEach()
+ val conf = new SparkConf()
+ conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+ init(conf)
+ assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+ submit(reduceRdd, Array(0))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
+ runEvent(ExecutorLost("exec-hostA", event))
+ if (expectFileLoss) {
+ intercept[MetadataFetchFailedException] {
+ mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
+ }
+ } else {
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ }
+ }
+ }
// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
@@ -628,7 +673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(stageAttempt.stageAttemptId == attempt)
}
-
// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
@@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
@@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
))
// then one executor dies, and a task fails in stage 1
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
@@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
makeMapStatus("hostA", reduceRdd.partitions.length)))
// now that host goes down
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// so we resubmit those tasks
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
@@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
@@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// Pretend host A was lost
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 36d1c5690f..7d6ad08036 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
override def executorAdded(execId: String, host: String) {}
- override def executorLost(execId: String) {}
+ override def executorLost(execId: String, reason: ExecutorLossReason) {}
override def taskSetFailed(
taskSet: TaskSet,