aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-06-27 16:38:03 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-27 16:38:03 -0500
commitc15b552dd547a129c7f0d082dab4eebbd64bee02 (patch)
tree2ecd7a9391e0ac079302712aafb47f6bc4e88200 /core
parent282158914d89b35a3f85388cb20bd62215f4f589 (diff)
downloadspark-c15b552dd547a129c7f0d082dab4eebbd64bee02.tar.gz
spark-c15b552dd547a129c7f0d082dab4eebbd64bee02.tar.bz2
spark-c15b552dd547a129c7f0d082dab4eebbd64bee02.zip
[SPARK-16106][CORE] TaskSchedulerImpl should properly track executors added to existing hosts
## What changes were proposed in this pull request? TaskSchedulerImpl used to only set `newExecAvailable` when a new *host* was added, not when a new executor was added to an existing host. It also didn't update some internal state tracking live executors until a task was scheduled on the executor. This patch changes it to properly update as soon as it knows about a new executor. ## How was this patch tested? added a unit test, ran everything via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #13826 from squito/SPARK-16106_executorByHosts.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala168
2 files changed, 111 insertions, 65 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4282606589..821e3ee7f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -266,7 +266,6 @@ private[spark] class TaskSchedulerImpl(
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
- executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
@@ -293,11 +292,14 @@ private[spark] class TaskSchedulerImpl(
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
- executorIdToHost(o.executorId) = o.host
- executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
+ }
+ if (!executorIdToTaskCount.contains(o.executorId)) {
+ executorsByHost(o.host) += o.executorId
executorAdded(o.executorId, o.host)
+ executorIdToHost(o.executorId) = o.host
+ executorIdToTaskCount(o.executorId) = 0
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a09a602d13..34b8d55339 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark._
import org.apache.spark.internal.Logging
@@ -27,18 +29,63 @@ class FakeSchedulerBackend extends SchedulerBackend {
def defaultParallelism(): Int = 1
}
-class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with Logging {
+class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
+ with Logging {
- test("Scheduler does not always schedule tasks on the same workers") {
+
+ var failedTaskSetException: Option[Throwable] = None
+ var failedTaskSetReason: String = null
+ var failedTaskSet = false
+
+ var taskScheduler: TaskSchedulerImpl = null
+ var dagScheduler: DAGScheduler = null
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ failedTaskSet = false
+ failedTaskSetException = None
+ failedTaskSetReason = null
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ if (taskScheduler != null) {
+ taskScheduler.stop()
+ taskScheduler = null
+ }
+ if (dagScheduler != null) {
+ dagScheduler.stop()
+ dagScheduler = null
+ }
+ }
+
+ def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
+ confs.foreach { case (k, v) =>
+ sc.conf.set(k, v)
+ }
+ taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
+ dagScheduler = new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+ override def executorAdded(execId: String, host: String): Unit = {}
+ override def taskSetFailed(
+ taskSet: TaskSet,
+ reason: String,
+ exception: Option[Throwable]): Unit = {
+ // Normally the DAGScheduler puts this in the event loop, which will eventually fail
+ // dependent jobs
+ failedTaskSet = true
+ failedTaskSetReason = reason
+ failedTaskSetException = exception
+ }
}
+ taskScheduler
+ }
+ test("Scheduler does not always schedule tasks on the same workers") {
+ val taskScheduler = setupScheduler()
val numFreeCores = 1
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
@@ -58,20 +105,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
assert(count > 0)
assert(count < numTrials)
+ assert(!failedTaskSet)
}
test("Scheduler correctly accounts for multiple CPUs per task") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2
-
- sc.conf.set("spark.task.cpus", taskCpus.toString)
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
+ val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
@@ -96,22 +135,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
+ assert(!failedTaskSet)
}
test("Scheduler does not crash when tasks are not serializable") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2
-
- sc.conf.set("spark.task.cpus", taskCpus.toString)
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- val dagScheduler = new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
+ val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val numFreeCores = 1
- taskScheduler.setDAGScheduler(dagScheduler)
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
@@ -119,26 +149,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)
+ assert(failedTaskSet)
+ assert(failedTaskSetReason.contains("Failed to serialize task"))
// Now check that we can still submit tasks
- // Even if one of the tasks has not-serializable tasks, the other task set should
+ // Even if one of the task sets has not-serializable tasks, the other task set should
// still be processed without error
- taskScheduler.submitTasks(taskSet)
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
+ taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
}
test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- val dagScheduler = new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
- taskScheduler.setDAGScheduler(dagScheduler)
+ val taskScheduler = setupScheduler()
val attempt1 = FakeTask.createTaskSet(1, 0)
val attempt2 = FakeTask.createTaskSet(1, 1)
taskScheduler.submitTasks(attempt1)
@@ -153,17 +177,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
.get.isZombie = true
taskScheduler.submitTasks(attempt3)
+ assert(!failedTaskSet)
}
test("don't schedule more tasks after a taskset is zombie") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
+ val taskScheduler = setupScheduler()
val numFreeCores = 1
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
@@ -191,17 +209,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert(1 === taskDescriptions3.length)
val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
assert(mgr.taskSet.stageAttemptId === 1)
+ assert(!failedTaskSet)
}
test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
+ val taskScheduler = setupScheduler()
val numFreeCores = 10
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
@@ -236,17 +248,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
assert(mgr.taskSet.stageAttemptId === 1)
}
+ assert(!failedTaskSet)
}
test("tasks are not re-scheduled while executor loss reason is pending") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
- taskScheduler.initialize(new FakeSchedulerBackend)
- // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- new DAGScheduler(sc, taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorAdded(execId: String, host: String) {}
- }
+ val taskScheduler = setupScheduler()
val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
@@ -272,6 +278,44 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
assert(1 === taskDescriptions3.length)
assert("executor1" === taskDescriptions3(0).executorId)
+ assert(!failedTaskSet)
}
+ test("SPARK-16106 locality levels updated if executor added to existing host") {
+ val taskScheduler = setupScheduler()
+
+ taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
+ (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
+ ))
+
+ val taskDescs = taskScheduler.resourceOffers(Seq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1)
+ )).flatten
+ // only schedule one task because of locality
+ assert(taskDescs.size === 1)
+
+ val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
+ assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY))
+ // we should know about both executors, even though we only scheduled tasks on one of them
+ assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0")))
+ assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1")))
+
+ // when executor2 is added, we should realize that we can run process-local tasks.
+ // And we should know its alive on the host.
+ val secondTaskDescs = taskScheduler.resourceOffers(
+ Seq(new WorkerOffer("executor2", "host0", 1))).flatten
+ assert(secondTaskDescs.size === 1)
+ assert(mgr.myLocalityLevels.toSet ===
+ Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY))
+ assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0", "executor2")))
+ assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1")))
+
+ // And even if we don't have anything left to schedule, another resource offer on yet another
+ // executor should also update the set of live executors
+ val thirdTaskDescs = taskScheduler.resourceOffers(
+ Seq(new WorkerOffer("executor3", "host1", 1))).flatten
+ assert(thirdTaskDescs.size === 0)
+ assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
+ }
}