aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2015-09-10 11:58:54 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 11:58:54 -0700
commitaf3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 (patch)
tree979a7e64505f9ccdabf98148b8f8e9e745448e65 /yarn
parenta76bde9dae54c4641e21f3c1ceb4870e3dc91881 (diff)
downloadspark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.gz
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.bz2
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.zip
[SPARK-8167] Make tasks that fail from YARN preemption not fail job
The architecture is that, in YARN mode, if the driver detects that an executor has disconnected, it asks the ApplicationMaster why the executor died. If the ApplicationMaster is aware that the executor died because of preemption, all tasks associated with that executor are not marked as failed. The executor is still removed from the driver's list of available executors, however. There's a few open questions: 1. Should standalone mode have a similar "get executor loss reason" as well? I localized this change as much as possible to affect only YARN, but there could be a valid case to differentiate executor losses in standalone mode as well. 2. I make a pretty strong assumption in YarnAllocator that getExecutorLossReason(executorId) will only be called once per executor id; I do this so that I can remove the metadata from the in-memory map to avoid object accumulation. It's not clear if I'm being overly zealous to save space, however. cc vanzin specifically for review because it collided with some earlier YARN scheduling work. cc JoshRosen because it's similar to output commit coordination we did in the past cc andrewor14 for our discussion on how to get executor exit codes and loss reasons Author: mcheah <mcheah@palantir.com> Closes #8007 from mccheah/feature/preemption-handling.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala92
2 files changed, 75 insertions, 24 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 991b5cec00..93621b44c9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -590,6 +590,13 @@ private[spark] class ApplicationMaster(
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
context.reply(true)
+
+ case GetExecutorLossReason(eid) =>
+ Option(allocator) match {
+ case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
+ case None => logWarning(s"Container allocator is not ready to find" +
+ s" executor loss reasons yet.")
+ }
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5f897cbcb4..fd88b8b7fe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,8 +21,9 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
-import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConverters._
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -36,8 +37,9 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.util.Utils
/**
@@ -93,6 +95,11 @@ private[yarn] class YarnAllocator(
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
+ // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+ // list of requesters that should be responded to once we find out why the given executor
+ // was lost.
+ private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -235,9 +242,7 @@ private[yarn] class YarnAllocator(
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
-
processCompletedContainers(completedContainers.asScala)
-
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
}
@@ -429,7 +434,7 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
- if (!alreadyReleased) {
+ val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
@@ -440,22 +445,42 @@ private[yarn] class YarnAllocator(
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) {
- logInfo("Container preempted: " + containerId)
- } else if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- VMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- PMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus != 0) {
- logInfo("Container marked as failed: " + containerId +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
- numExecutorsFailed += 1
+ val exitStatus = completedContainer.getExitStatus
+ val (isNormalExit, containerExitReason) = exitStatus match {
+ case ContainerExitStatus.SUCCESS =>
+ (true, s"Executor for container $containerId exited normally.")
+ case ContainerExitStatus.PREEMPTED =>
+ // Preemption should count as a normal exit, since YARN preempts containers merely
+ // to do resource sharing, and tasks that fail due to preempted executors could
+ // just as easily finish on any other executor. See SPARK-8167.
+ (true, s"Container $containerId was preempted.")
+ // Should probably still count memory exceeded exit codes towards task failures
+ case VMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ case PMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ case unknown =>
+ numExecutorsFailed += 1
+ (false, "Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+
+ }
+ if (isNormalExit) {
+ logInfo(containerExitReason)
+ } else {
+ logWarning(containerExitReason)
}
+ ExecutorExited(0, isNormalExit, containerExitReason)
+ } else {
+ // If we have already released this container, then it must mean
+ // that the driver has explicitly requested it to be killed
+ ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+ s"Container $containerId exited from explicit termination request.")
}
if (allocatedContainerToHostMap.contains(containerId)) {
@@ -474,18 +499,35 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
-
+ pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+ }
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
- driverRef.send(RemoveExecutor(eid,
- s"Yarn deallocated the executor $eid (container $containerId)"))
+ driverRef.send(RemoveExecutor(eid, exitReason))
}
}
}
}
+ /**
+ * Register that some RpcCallContext has asked the AM why the executor was lost. Note that
+ * we can only find the loss reason to send back in the next call to allocateResources().
+ */
+ private[yarn] def enqueueGetLossReasonRequest(
+ eid: String,
+ context: RpcCallContext): Unit = synchronized {
+ if (executorIdToContainer.contains(eid)) {
+ pendingLossReasonRequests
+ .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else {
+ logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ }
+ }
+
private def internalReleaseContainer(container: Container): Unit = {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
@@ -501,6 +543,8 @@ private object YarnAllocator {
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+ val VMEM_EXCEEDED_EXIT_CODE = -103
+ val PMEM_EXCEEDED_EXIT_CODE = -104
def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)