aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMark Grover <grover.markgrover@gmail.com>2015-11-03 08:51:40 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-03 08:51:40 -0800
commitb2e4b314d989de8cad012bbddba703b31d8378a4 (patch)
treebf1bee33004202cd407d83666fc90206873a1a80 /core/src/main
parentf54ff19b1edd4903950cb334987a447445fa97ef (diff)
downloadspark-b2e4b314d989de8cad012bbddba703b31d8378a4.tar.gz
spark-b2e4b314d989de8cad012bbddba703b31d8378a4.tar.bz2
spark-b2e4b314d989de8cad012bbddba703b31d8378a4.zip
[SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why executors were killed.
Author: Mark Grover <grover.markgrover@gmail.com> Closes #8093 from markgrover/nm2.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
6 files changed, 22 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 18278b292f..13241b77bf 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -223,8 +223,10 @@ case class TaskCommitDenied(
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
- extends TaskFailedReason {
+case class ExecutorLostFailure(
+ execId: String,
+ exitCausedByApp: Boolean = true,
+ reason: Option[String]) extends TaskFailedReason {
override def toErrorString: String = {
val exitBehavior = if (exitCausedByApp) {
"caused by one of the running tasks"
@@ -232,6 +234,8 @@ case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
"unrelated to the running tasks"
}
s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
+ s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
+ reason.map { r => s" Reason: $r" }.getOrElse("")
}
override def countTowardsTaskFailures: Boolean = exitCausedByApp
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index f25710bb5b..623da3e9c1 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -67,7 +67,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
- * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
@@ -82,7 +82,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
*
- * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9b3fad9012..114468c48c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -802,8 +802,8 @@ private[spark] class TaskSetManager(
case exited: ExecutorExited => exited.exitCausedByApp
case _ => true
}
- handleFailedTask(
- tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp))
+ handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
+ Some(reason.toString)))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 439a119270..ebce5021b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -125,7 +125,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
-
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -195,7 +194,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
addressToExecutorId
.get(remoteAddress)
- .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
+ .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
+ "containers exceeding thresholds, or network issues. Check driver logs for WARN " +
+ "messages.")))
}
// Make fake resource offers on just one executor
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index cb24072d7d..d75d6f673e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -175,6 +175,7 @@ private[spark] abstract class YarnSchedulerBackend(
addWebUIFilter(filterName, filterParams, proxyBase)
case RemoveExecutor(executorId, reason) =>
+ logWarning(reason.toString)
removeExecutor(executorId, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ad6615c112..ee2eb58cf5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -367,9 +367,10 @@ private[spark] object JsonProtocol {
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
- case ExecutorLostFailure(executorId, exitCausedByApp) =>
+ case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
("Executor ID" -> executorId) ~
- ("Exit Caused By App" -> exitCausedByApp)
+ ("Exit Caused By App" -> exitCausedByApp) ~
+ ("Loss Reason" -> reason.map(_.toString))
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -812,7 +813,11 @@ private[spark] object JsonProtocol {
case `executorLostFailure` =>
val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true))
+ val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String])
+ ExecutorLostFailure(
+ executorId.getOrElse("Unknown"),
+ exitCausedByApp.getOrElse(true),
+ reason)
case `unknownReason` => UnknownReason
}
}