diff options
author | Devaraj K <devaraj@apache.org> | 2016-05-03 13:25:28 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-03 13:25:28 -0700 |
commit | 659f635d3bd0c0d025bf514dfb1747ed7386ba45 (patch) | |
tree | 9f566602f13d62487ec479f45286a7d01c59b790 /core | |
parent | f5623b460224ce363316c63f5d28947215078fc5 (diff) | |
download | spark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.tar.gz spark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.tar.bz2 spark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.zip |
[SPARK-14234][CORE] Executor crashes for TaskRunner thread interruption
## What changes were proposed in this pull request?
Resetting the task interruption status before updating the task status.
## How was this patch tested?
I have verified it manually by running multiple applications, Executor doesn't crash and updates the status to the driver without any exceptions with the patch changes.
Author: Devaraj K <devaraj@apache.org>
Closes #12031 from devaraj-kavali/SPARK-14234.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 64e87a95d0..b695aecc13 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -23,6 +23,7 @@ import java.net.URL import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -194,6 +195,10 @@ private[spark] class Executor( /** Whether this task has been killed. */ @volatile private var killed = false + /** Whether this task has been finished. */ + @GuardedBy("TaskRunner.this") + private var finished = false + /** How much the JVM process has spent in GC when the task starts to run. */ @volatile var startGCTime: Long = _ @@ -207,10 +212,25 @@ private[spark] class Executor( logInfo(s"Executor is trying to kill $taskName (TID $taskId)") killed = true if (task != null) { - task.kill(interruptThread) + synchronized { + if (!finished) { + task.kill(interruptThread) + } + } } } + /** + * Set the finished flag to true and clear the current thread's interrupt status + */ + private def setTaskFinishedAndClearInterruptStatus(): Unit = synchronized { + this.finished = true + // SPARK-14234 - Reset the interrupted status of the thread to avoid the + // ClosedByInterruptException during execBackend.statusUpdate which causes + // Executor to crash + Thread.interrupted() + } + override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() @@ -336,14 +356,17 @@ private[spark] class Executor( } catch { case ffe: FetchFailedException => val reason = ffe.toTaskEndReason + setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case _: TaskKilledException | _: InterruptedException if task.killed => logInfo(s"Executor killed $taskName (TID $taskId)") + setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskEndReason + setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case t: Throwable => @@ -373,6 +396,7 @@ private[spark] class Executor( ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) } } + setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) // Don't forcibly exit unless the exception was inherently fatal, to avoid |