aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/Executor.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/Executor.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala17
1 files changed, 13 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f99cd9f3b..b75c77b5b4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -108,8 +108,13 @@ private[spark] class Executor(
startDriverHeartbeater()
def launchTask(
- context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
- val tr = new TaskRunner(context, taskId, taskName, serializedTask)
+ context: ExecutorBackend,
+ taskId: Long,
+ attemptNumber: Int,
+ taskName: String,
+ serializedTask: ByteBuffer) {
+ val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
+ serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
@@ -134,7 +139,11 @@ private[spark] class Executor(
private def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
class TaskRunner(
- execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
+ execBackend: ExecutorBackend,
+ val taskId: Long,
+ val attemptNumber: Int,
+ taskName: String,
+ serializedTask: ByteBuffer)
extends Runnable {
@volatile private var killed = false
@@ -180,7 +189,7 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
- val value = task.run(taskId.toInt)
+ val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.