aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala9
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala46
4 files changed, 85 insertions, 8 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b07dc3af4..6184ad591c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -231,14 +231,14 @@ private[spark] class Client(
"Cluster's default value will be used.")
}
- sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
+ sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
try {
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+ logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
"the version of YARN does not support it")
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b59e6cff2f..066c665954 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -22,7 +22,7 @@ import java.util.concurrent._
import java.util.regex.Pattern
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -102,7 +102,13 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
- @volatile private var numExecutorsFailed = 0
+ // Queue to store the timestamp of failed executors
+ private val failedExecutorsTimeStamps = new Queue[Long]()
+
+ private var clock: Clock = new SystemClock
+
+ private val executorFailuresValidityInterval =
+ sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
@volatile private var targetNumExecutors =
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
@@ -166,9 +172,26 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+ /**
+ * Use a different clock for YarnAllocator. This is mainly used for testing.
+ */
+ def setClock(newClock: Clock): Unit = {
+ clock = newClock
+ }
+
def getNumExecutorsRunning: Int = numExecutorsRunning
- def getNumExecutorsFailed: Int = numExecutorsFailed
+ def getNumExecutorsFailed: Int = synchronized {
+ val endTime = clock.getTimeMillis()
+
+ while (executorFailuresValidityInterval > 0
+ && failedExecutorsTimeStamps.nonEmpty
+ && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
+ failedExecutorsTimeStamps.dequeue()
+ }
+
+ failedExecutorsTimeStamps.size
+ }
/**
* A sequence of pending container requests that have not yet been fulfilled.
@@ -527,7 +550,8 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
- numExecutorsFailed += 1
+ // Enqueue the timestamp of failed executor
+ failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3816a84ab2..c4dd3202f0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -33,13 +33,20 @@ package object config {
.toSequence
.createOptional
- private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+ private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
.doc("Interval after which AM failures will be considered independent and " +
"not accumulate towards the attempt count.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+ ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
+ .doc("Interval after which Executor failures will be considered independent and not " +
+ "accumulate towards the attempt count.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
.doc("Maximum number of AM attempts before failing the app.")
.intConf
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6a861d6f66..f4f8bd435d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.ManualClock
class MockResolver extends DNSToSwitchMapping {
@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
+
+ test("window based failure executor counting") {
+ sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
+ val handler = createAllocator(4)
+ val clock = new ManualClock(0L)
+ handler.setClock(clock)
+
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getPendingAllocate.size should be (4)
+
+ val containers = Seq(
+ createContainer("host1"),
+ createContainer("host2"),
+ createContainer("host3"),
+ createContainer("host4")
+ )
+ handler.handleAllocatedContainers(containers)
+
+ val failedStatuses = containers.map { c =>
+ ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
+ }
+
+ handler.getNumExecutorsFailed should be (0)
+
+ clock.advance(100 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(0, 1))
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(101 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+
+ handler.processCompletedContainers(failedStatuses.slice(1, 3))
+ handler.getNumExecutorsFailed should be (2)
+
+ clock.advance(50 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(3, 4))
+ handler.getNumExecutorsFailed should be (3)
+
+ clock.advance(51 * 1000L)
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(50 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+ }
}