aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala26
1 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 64e87a95d0..b695aecc13 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,6 +23,7 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -194,6 +195,10 @@ private[spark] class Executor(
/** Whether this task has been killed. */
@volatile private var killed = false
+ /** Whether this task has been finished. */
+ @GuardedBy("TaskRunner.this")
+ private var finished = false
+
/** How much the JVM process has spent in GC when the task starts to run. */
@volatile var startGCTime: Long = _
@@ -207,10 +212,25 @@ private[spark] class Executor(
logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
killed = true
if (task != null) {
- task.kill(interruptThread)
+ synchronized {
+ if (!finished) {
+ task.kill(interruptThread)
+ }
+ }
}
}
+ /**
+ * Set the finished flag to true and clear the current thread's interrupt status
+ */
+ private def setTaskFinishedAndClearInterruptStatus(): Unit = synchronized {
+ this.finished = true
+ // SPARK-14234 - Reset the interrupted status of the thread to avoid the
+ // ClosedByInterruptException during execBackend.statusUpdate which causes
+ // Executor to crash
+ Thread.interrupted()
+ }
+
override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
@@ -336,14 +356,17 @@ private[spark] class Executor(
} catch {
case ffe: FetchFailedException =>
val reason = ffe.toTaskEndReason
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case _: TaskKilledException | _: InterruptedException if task.killed =>
logInfo(s"Executor killed $taskName (TID $taskId)")
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case t: Throwable =>
@@ -373,6 +396,7 @@ private[spark] class Executor(
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
}
}
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
// Don't forcibly exit unless the exception was inherently fatal, to avoid