aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorxutingjun <xutingjun@huawei.com>2015-08-11 23:19:35 -0700
committerAndrew Or <andrew@databricks.com>2015-08-11 23:19:35 -0700
commitb85f9a242a12e8096e331fa77d5ebd16e93c844d (patch)
treedabfeff8005e90d79587fee88f57e0c74d3bbda0 /core
parentb1581ac28840a4d2209ef8bb5c9f8700b4c1b286 (diff)
downloadspark-b85f9a242a12e8096e331fa77d5ebd16e93c844d.tar.gz
spark-b85f9a242a12e8096e331fa77d5ebd16e93c844d.tar.bz2
spark-b85f9a242a12e8096e331fa77d5ebd16e93c844d.zip
[SPARK-8366] maxNumExecutorsNeeded should properly handle failed tasks
Author: xutingjun <xutingjun@huawei.com> Author: meiyoula <1039320815@qq.com> Closes #6817 from XuTingjun/SPARK-8366.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala22
2 files changed, 34 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1877aaf2ca..b93536e653 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -599,14 +599,8 @@ private[spark] class ExecutorAllocationManager(
// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
- val numTasksScheduled = stageIdToTaskIndices(stageId).size
- val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
- if (numTasksScheduled == numTasksTotal) {
- // No more pending tasks for this stage
- stageIdToNumTasks -= stageId
- if (stageIdToNumTasks.isEmpty) {
- allocationManager.onSchedulerQueueEmpty()
- }
+ if (totalPendingTasks() == 0) {
+ allocationManager.onSchedulerQueueEmpty()
}
// Mark the executor on which this task is scheduled as busy
@@ -618,6 +612,8 @@ private[spark] class ExecutorAllocationManager(
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
+ val taskIndex = taskEnd.taskInfo.index
+ val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
@@ -628,6 +624,16 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onExecutorIdle(executorId)
}
}
+
+ // If the task failed, we expect it to be resubmitted later. To ensure we have
+ // enough resources to run the resubmitted task, we need to mark the scheduler
+ // as backlogged again if it's not already marked as such (SPARK-8366)
+ if (taskEnd.reason != Success) {
+ if (totalPendingTasks() == 0) {
+ allocationManager.onSchedulerBacklogged()
+ }
+ stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 34caca8928..f374f97f87 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -206,8 +206,8 @@ class ExecutorAllocationManagerSuite
val task2Info = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
- sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
- sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))
assert(adjustRequestedExecutors(manager) === -1)
}
@@ -787,6 +787,24 @@ class ExecutorAllocationManagerSuite
Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
}
+ test("SPARK-8366: maxNumExecutorsNeeded should properly handle failed tasks") {
+ sc = createSparkContext()
+ val manager = sc.executorAllocationManager.get
+ assert(maxNumExecutorsNeeded(manager) === 0)
+
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+ assert(maxNumExecutorsNeeded(manager) === 1)
+
+ val taskInfo = createTaskInfo(1, 1, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
+ assert(maxNumExecutorsNeeded(manager) === 1)
+
+ // If the task is failed, we expect it to be resubmitted later.
+ val taskEndReason = ExceptionFailure(null, null, null, null, null)
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
+ assert(maxNumExecutorsNeeded(manager) === 1)
+ }
+
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,