aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorRyan Williams <ryan.blake.williams@gmail.com>2015-10-19 16:34:15 -0700
committerAndrew Or <andrew@databricks.com>2015-10-19 16:34:15 -0700
commit16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec (patch)
tree5ae77f9fd48113e8052d6a34dfbcff42fdaf611a /yarn/src
parentfc26f32cf1bede8b9a1343dca0c0182107c9985e (diff)
downloadspark-16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec.tar.gz
spark-16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec.tar.bz2
spark-16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec.zip
[SPARK-11120] Allow sane default number of executor failures when dynamically allocating in YARN
I also added some information to container-failure error msgs about what host they failed on, which would have helped me identify the problem that lead me to this JIRA and PR sooner. Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #9147 from ryan-williams/dyn-exec-failures.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala19
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala19
2 files changed, 26 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d1d248bf79..4b4d9990ce 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -62,10 +62,21 @@ private[spark] class ApplicationMaster(
.asInstanceOf[YarnConfiguration]
private val isClusterMode = args.userClass != null
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3)))
+ // Default to twice the number of executors (twice the maximum number of executors if dynamic
+ // allocation is enabled), with a minimum of 3.
+
+ private val maxNumExecutorFailures = {
+ val defaultKey =
+ if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+ "spark.dynamicAllocation.maxExecutors"
+ } else {
+ "spark.executor.instances"
+ }
+ val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0)
+ val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors)
+
+ sparkConf.getInt("spark.yarn.max.executor.failures", defaultMaxNumExecutorFailures)
+ }
@volatile private var exitCode = 0
@volatile private var unregistered = false
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 9e1ef1b3b4..1deaa3743d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -430,17 +430,20 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
+ val hostOpt = allocatedContainerToHostMap.get(containerId)
+ val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
+ onHostStr,
completedContainer.getState,
completedContainer.getExitStatus))
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
+ // now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus
val (isNormalExit, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
@@ -449,7 +452,7 @@ private[yarn] class YarnAllocator(
// Preemption should count as a normal exit, since YARN preempts containers merely
// to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
- (true, s"Container $containerId was preempted.")
+ (true, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(false, memLimitExceededLogMessage(
@@ -461,7 +464,7 @@ private[yarn] class YarnAllocator(
PMEM_EXCEEDED_PATTERN))
case unknown =>
numExecutorsFailed += 1
- (false, "Container marked as failed: " + containerId +
+ (false, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
@@ -479,10 +482,10 @@ private[yarn] class YarnAllocator(
s"Container $containerId exited from explicit termination request.")
}
- if (allocatedContainerToHostMap.contains(containerId)) {
- val host = allocatedContainerToHostMap.get(containerId).get
- val containerSet = allocatedHostToContainersMap.get(host).get
-
+ for {
+ host <- hostOpt
+ containerSet <- allocatedHostToContainersMap.get(host)
+ } {
containerSet.remove(containerId)
if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host)