aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-19 16:49:18 -0800
committerAndrew Or <andrew@databricks.com>2015-11-19 16:49:18 -0800
commit880128f37e1bc0b9d98d1786670be62a06c648f2 (patch)
tree1dce0fc2622ebe9d54ca5d70dd6e2f0277213824
parent37cff1b1a79cad11277612cb9bc8bc2365cf5ff2 (diff)
downloadspark-880128f37e1bc0b9d98d1786670be62a06c648f2.tar.gz
spark-880128f37e1bc0b9d98d1786670be62a06c648f2.tar.bz2
spark-880128f37e1bc0b9d98d1786670be62a06c648f2.zip
[SPARK-4134][CORE] Lower severity of some executor loss logs.
Don't log ERROR messages when executors are explicitly killed or when the exit reason is not yet known. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9780 from vanzin/SPARK-11789.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala4
5 files changed, 45 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 47a5cbff49..7e1197d742 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,8 @@ private[spark] object ExecutorExited {
}
}
+private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.")
+
/**
* A loss reason that means we don't yet know why the executor exited.
*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bf0419db1f..bdf19f9f27 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -470,25 +470,25 @@ private[spark] class TaskSchedulerImpl(
synchronized {
if (executorIdToTaskCount.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
- logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
+ logExecutorLoss(executorId, hostPort, reason)
removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
- executorIdToHost.get(executorId) match {
- case Some(_) =>
- // If the host mapping still exists, it means we don't know the loss reason for the
- // executor. So call removeExecutor() to update tasks running on that executor when
- // the real loss reason is finally known.
- logError(s"Actual reason for lost executor $executorId: ${reason.message}")
- removeExecutor(executorId, reason)
-
- case None =>
- // We may get multiple executorLost() calls with different loss reasons. For example,
- // one may be triggered by a dropped connection from the slave while another may be a
- // report of executor termination from Mesos. We produce log messages for both so we
- // eventually report the termination reason.
- logError("Lost an executor " + executorId + " (already removed): " + reason)
- }
+ executorIdToHost.get(executorId) match {
+ case Some(hostPort) =>
+ // If the host mapping still exists, it means we don't know the loss reason for the
+ // executor. So call removeExecutor() to update tasks running on that executor when
+ // the real loss reason is finally known.
+ logExecutorLoss(executorId, hostPort, reason)
+ removeExecutor(executorId, reason)
+
+ case None =>
+ // We may get multiple executorLost() calls with different loss reasons. For example,
+ // one may be triggered by a dropped connection from the slave while another may be a
+ // report of executor termination from Mesos. We produce log messages for both so we
+ // eventually report the termination reason.
+ logError(s"Lost an executor $executorId (already removed): $reason")
+ }
}
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -498,6 +498,18 @@ private[spark] class TaskSchedulerImpl(
}
}
+ private def logExecutorLoss(
+ executorId: String,
+ hostPort: String,
+ reason: ExecutorLossReason): Unit = reason match {
+ case LossReasonPending =>
+ logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.")
+ case ExecutorKilled =>
+ logInfo(s"Executor $executorId on $hostPort killed by driver.")
+ case _ =>
+ logError(s"Lost executor $executorId on $hostPort: $reason")
+ }
+
/**
* Remove an executor from all our data structures and mark it as lost. If the executor's loss
* reason is not yet known, do not yet remove its association with its host nor update the status
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 114468c48c..a02f3017cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -800,6 +800,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
+ case ExecutorKilled => false
case _ => true
}
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f0c910c00..505c161141 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -64,8 +64,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val listenerBus = scheduler.sc.listenerBus
- // Executors we have requested the cluster manager to kill that have not died yet
- private val executorsPendingToRemove = new HashSet[String]
+ // Executors we have requested the cluster manager to kill that have not died yet; maps
+ // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
+ // be considered an app-related failure).
+ private val executorsPendingToRemove = new HashMap[String, Boolean]
// A map to store hostname with its possible task number running on it
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
@@ -250,15 +252,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case Some(executorInfo) =>
// This must be synchronized because variables mutated
// in this block are read when requesting executors
- CoarseGrainedSchedulerBackend.this.synchronized {
+ val killed = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
- executorsPendingToRemove -= executorId
executorsPendingLossReason -= executorId
+ executorsPendingToRemove.remove(executorId).getOrElse(false)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
- scheduler.executorLost(executorId, reason)
+ scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
@@ -459,6 +461,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
*
+ * When asking the executor to be replaced, the executor loss is considered a failure, and
+ * killed tasks that are running on the executor will count towards the failure limits. If no
+ * replacement is being requested, then the tasks will not count towards the limit.
+ *
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
* @param force whether to force kill busy executors
@@ -479,7 +485,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
- executorsPendingToRemove ++= executorsToKill
+ executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7e39c3ea56..73cd9031f0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -481,7 +481,7 @@ private[yarn] class YarnAllocator(
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
- case unknown =>
+ case _ =>
numExecutorsFailed += 1
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
@@ -493,7 +493,7 @@ private[yarn] class YarnAllocator(
} else {
logInfo(containerExitReason)
}
- ExecutorExited(0, exitCausedByApp, containerExitReason)
+ ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
} else {
// If we have already released this container, then it must mean
// that the driver has explicitly requested it to be killed