aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjinxing <jinxing6042@126.com>2017-03-09 10:56:19 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-03-09 10:56:19 -0800
commit3232e54f2fcb8d2072cba4bc763ef29d5d8d325f (patch)
treeb0b6aac4539f8be1acdc5fc9a0b784611c61b305
parentb60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9 (diff)
downloadspark-3232e54f2fcb8d2072cba4bc763ef29d5d8d325f.tar.gz
spark-3232e54f2fcb8d2072cba4bc763ef29d5d8d325f.tar.bz2
spark-3232e54f2fcb8d2072cba4bc763ef29d5d8d325f.zip
[SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager.
## What changes were proposed in this pull request? TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct. ## How was this patch tested? Existing tests. Author: jinxing <jinxing6042@126.com> Closes #17133 from jinxing64/SPARK-19793.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala2
4 files changed, 17 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 59680139e7..9843eab4f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,11 +70,13 @@ class TaskInfo(
var killed = false
- private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
+ private[spark] def markGettingResult(time: Long) {
gettingResultTime = time
}
- private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) {
+ private[spark] def markFinished(state: TaskState, time: Long) {
+ // finishTime should be set larger than 0, otherwise "finished" below will return false.
+ assert(time > 0)
finishTime = time
if (state == TaskState.FAILED) {
failed = true
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 19ebaf817e..11633bef3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -667,7 +667,7 @@ private[spark] class TaskSetManager(
*/
def handleTaskGettingResult(tid: Long): Unit = {
val info = taskInfos(tid)
- info.markGettingResult()
+ info.markGettingResult(clock.getTimeMillis())
sched.dagScheduler.taskGettingResult(info)
}
@@ -695,7 +695,7 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
- info.markFinished(TaskState.FINISHED)
+ info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@@ -739,7 +739,7 @@ private[spark] class TaskSetManager(
return
}
removeRunningTask(tid)
- info.markFinished(state)
+ info.markFinished(state, clock.getTimeMillis())
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 2c2cda9f31..f36bcd8504 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -192,6 +192,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
+ clock.advance(1)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success)
@@ -377,6 +378,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
+ clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -394,6 +396,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
+ clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -427,6 +430,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock
+ clock.advance(1)
// We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset.
val mockListenerBus = mock(classOf[LiveListenerBus])
@@ -551,7 +555,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
+ val clock = new ManualClock()
+ clock.advance(1)
+ val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
sched.addExecutor("execA", "host1")
manager.executorAdded()
sched.addExecutor("execC", "host2")
@@ -904,6 +910,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+ clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
@@ -961,6 +968,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
tasks += task
}
assert(sched.startedTasks.toSet === (0 until 5).toSet)
+ clock.advance(1)
// Complete 3 tasks and leave 2 tasks in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 11482d187a..38030e0660 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
- taskInfo.markFinished(TaskState.FINISHED)
+ taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(