aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDevaraj K <devaraj@apache.org>2016-05-03 13:25:28 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-03 13:25:28 -0700
commit659f635d3bd0c0d025bf514dfb1747ed7386ba45 (patch)
tree9f566602f13d62487ec479f45286a7d01c59b790 /core
parentf5623b460224ce363316c63f5d28947215078fc5 (diff)
downloadspark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.tar.gz
spark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.tar.bz2
spark-659f635d3bd0c0d025bf514dfb1747ed7386ba45.zip
[SPARK-14234][CORE] Executor crashes for TaskRunner thread interruption
## What changes were proposed in this pull request? Resetting the task interruption status before updating the task status. ## How was this patch tested? I have verified it manually by running multiple applications, Executor doesn't crash and updates the status to the driver without any exceptions with the patch changes. Author: Devaraj K <devaraj@apache.org> Closes #12031 from devaraj-kavali/SPARK-14234.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala26
1 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 64e87a95d0..b695aecc13 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,6 +23,7 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -194,6 +195,10 @@ private[spark] class Executor(
/** Whether this task has been killed. */
@volatile private var killed = false
+ /** Whether this task has been finished. */
+ @GuardedBy("TaskRunner.this")
+ private var finished = false
+
/** How much the JVM process has spent in GC when the task starts to run. */
@volatile var startGCTime: Long = _
@@ -207,10 +212,25 @@ private[spark] class Executor(
logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
killed = true
if (task != null) {
- task.kill(interruptThread)
+ synchronized {
+ if (!finished) {
+ task.kill(interruptThread)
+ }
+ }
}
}
+ /**
+ * Set the finished flag to true and clear the current thread's interrupt status
+ */
+ private def setTaskFinishedAndClearInterruptStatus(): Unit = synchronized {
+ this.finished = true
+ // SPARK-14234 - Reset the interrupted status of the thread to avoid the
+ // ClosedByInterruptException during execBackend.statusUpdate which causes
+ // Executor to crash
+ Thread.interrupted()
+ }
+
override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
@@ -336,14 +356,17 @@ private[spark] class Executor(
} catch {
case ffe: FetchFailedException =>
val reason = ffe.toTaskEndReason
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case _: TaskKilledException | _: InterruptedException if task.killed =>
logInfo(s"Executor killed $taskName (TID $taskId)")
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case t: Throwable =>
@@ -373,6 +396,7 @@ private[spark] class Executor(
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
}
}
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
// Don't forcibly exit unless the exception was inherently fatal, to avoid