aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-02-06 16:10:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-06 16:10:48 -0800
commit18ad59e2c6b7bd009e8ba5ebf8fcf99630863029 (patch)
treee8eea9263dc23ab4b4508e0330425cccab0333ef /core/src/test
parent446403b63763157831ddbf6209044efc3cc7bf7c (diff)
downloadspark-18ad59e2c6b7bd009e8ba5ebf8fcf99630863029.tar.gz
spark-18ad59e2c6b7bd009e8ba5ebf8fcf99630863029.tar.bz2
spark-18ad59e2c6b7bd009e8ba5ebf8fcf99630863029.zip
Merge pull request #321 from kayousterhout/ui_kill_fix. Closes #321.
Inform DAG scheduler about all started/finished tasks. Previously, the DAG scheduler was not always informed when tasks started and finished. The simplest example here is for speculated tasks: the DAGScheduler was only told about the first attempt of a task, meaning that SparkListeners were also not told about multiple task attempts, so users can't see what's going on with speculation in the UI. The DAGScheduler also wasn't always told about finished tasks, so in the UI, some tasks will never be shown as finished (this occurs, for example, if a task set gets killed). The other problem is that the fairness accounting was wrong -- the number of running tasks in a pool was decreased when a task set was considered done, even if all of its tasks hadn't yet finished. Author: Kay Ousterhout <kayousterhout@gmail.com> == Merge branch commits == commit c8d547d0f7a17f5a193bef05f5872b9f475675c5 Author: Kay Ousterhout <kayousterhout@gmail.com> Date: Wed Jan 15 16:47:33 2014 -0800 Addressed Reynold's review comments. Always use a TaskEndReason (remove the option), and explicitly signal when we don't know the reason. Also, always tell DAGScheduler (and associated listeners) about started tasks, even when they're speculated. commit 3fee1e2e3c06b975ff7f95d595448f38cce97a04 Author: Kay Ousterhout <kayousterhout@gmail.com> Date: Wed Jan 8 22:58:13 2014 -0800 Fixed broken test and improved logging commit ff12fcaa2567c5d02b75a1d5db35687225bcd46f Author: Kay Ousterhout <kayousterhout@gmail.com> Date: Sun Dec 29 21:08:20 2013 -0800 Inform DAG scheduler about all finished tasks. Previously, the DAG scheduler was not always informed when tasks finished. For example, when a task set was aborted, the DAG scheduler was never told when the tasks in that task set finished. The DAG scheduler was also never told about the completion of speculated tasks. This led to confusion with SparkListeners because information about the completion of those tasks was never passed on to the listeners (so in the UI, for example, some tasks will never be shown as finished). The other problem is that the fairness accounting was wrong -- the number of running tasks in a pool was decreased when a task set was considered done, even if all of its tasks hadn't yet finished.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala4
3 files changed, 47 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 235d31709a..98ea4cb561 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -36,22 +36,24 @@ class FakeTaskSetManager(
parent = null
weight = 1
minShare = 2
- runningTasks = 0
priority = initPriority
stageId = initStageId
name = "TaskSet_"+stageId
override val numTasks = initNumTasks
tasksSuccessful = 0
+ var numRunningTasks = 0
+ override def runningTasks = numRunningTasks
+
def increaseRunningTasks(taskNum: Int) {
- runningTasks += taskNum
+ numRunningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
def decreaseRunningTasks(taskNum: Int) {
- runningTasks -= taskNum
+ numRunningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
@@ -77,7 +79,7 @@ class FakeTaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
- if (tasksSuccessful + runningTasks < numTasks) {
+ if (tasksSuccessful + numRunningTasks < numTasks) {
increaseRunningTasks(1)
Some(new TaskDescription(0, execId, "task 0:0", 0, null))
} else {
@@ -98,7 +100,7 @@ class FakeTaskSetManager(
}
def abort() {
- decreaseRunningTasks(runningTasks)
+ decreaseRunningTasks(numRunningTasks)
parent.removeSchedulable(this)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1a16e438c4..368c5154ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -168,6 +168,39 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
assert(listener.endedTasks.contains(TASK_INDEX))
}
+ test("onTaskEnd() should be called for all started tasks, even after job has been killed") {
+ val WAIT_TIMEOUT_MILLIS = 10000
+ val listener = new SaveTaskEvents
+ sc.addSparkListener(listener)
+
+ val numTasks = 10
+ val f = sc.parallelize(1 to 10000, numTasks).map { i => Thread.sleep(10); i }.countAsync()
+ // Wait until one task has started (because we want to make sure that any tasks that are started
+ // have corresponding end events sent to the listener).
+ var finishTime = System.currentTimeMillis + WAIT_TIMEOUT_MILLIS
+ listener.synchronized {
+ var remainingWait = finishTime - System.currentTimeMillis
+ while (listener.startedTasks.isEmpty && remainingWait > 0) {
+ listener.wait(remainingWait)
+ remainingWait = finishTime - System.currentTimeMillis
+ }
+ assert(!listener.startedTasks.isEmpty)
+ }
+
+ f.cancel()
+
+ // Ensure that onTaskEnd is called for all started tasks.
+ finishTime = System.currentTimeMillis + WAIT_TIMEOUT_MILLIS
+ listener.synchronized {
+ var remainingWait = finishTime - System.currentTimeMillis
+ while (listener.endedTasks.size < listener.startedTasks.size && remainingWait > 0) {
+ listener.wait(finishTime - System.currentTimeMillis)
+ remainingWait = finishTime - System.currentTimeMillis
+ }
+ assert(listener.endedTasks.size === listener.startedTasks.size)
+ }
+ }
+
def checkNonZeroAvg(m: Traversable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}
@@ -184,12 +217,14 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val startedGettingResultTasks = new HashSet[Int]()
val endedTasks = new HashSet[Int]()
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
startedTasks += taskStart.taskInfo.index
+ notify()
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- endedTasks += taskEnd.taskInfo.index
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ endedTasks += taskEnd.taskInfo.index
+ notify()
}
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecac2f79a2..de321c45b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -269,7 +269,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
// Tell it the task has finished but the result was lost.
- manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
+ manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
assert(sched.endedTasks(0) === TaskResultLost)
// Re-offer the host -- now we should get task 0 again.
@@ -290,7 +290,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
- manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
if (index < MAX_TASK_FAILURES) {
assert(!sched.taskSetsFailed.contains(taskSet.id))
} else {