diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6e3ef0e54f..29341dfe30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -30,7 +30,6 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId @@ -380,17 +379,17 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics + accumUpdates: Array[(Long, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - - val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { - taskMetrics.flatMap { case (id, metrics) => + // (taskId, stageId, stageAttemptId, accumUpdates) + val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { + accumUpdates.flatMap { case (id, updates) => taskIdToTaskSetManager.get(id).map { taskSetMgr => - (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates) } } } - dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) + dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { |