aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-28 12:38:19 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-04-28 12:38:19 -0500
commit8b44bd52fa40c0fc7d34798c3654e31533fd3008 (patch)
tree3642569112ccf117f1c91b363d2aa0445dd2e990 /yarn
parent9e785079b6ed4ea691c3c14c762a7f73fb6254bf (diff)
downloadspark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.gz
spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.bz2
spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.zip
[SPARK-6735][YARN] Add window based executor failure tracking mechanism for long running service
This work is based on twinkle-sachdeva 's proposal. In parallel to such mechanism for AM failures, here add similar mechanism for executor failure tracking, this is useful for long running Spark service to mitigate the executor failure problems. Please help to review, tgravescs sryza and vanzin Author: jerryshao <sshao@hortonworks.com> Closes #10241 from jerryshao/SPARK-6735.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala9
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala46
4 files changed, 85 insertions, 8 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b07dc3af4..6184ad591c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -231,14 +231,14 @@ private[spark] class Client(
"Cluster's default value will be used.")
}
- sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
+ sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
try {
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+ logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
"the version of YARN does not support it")
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b59e6cff2f..066c665954 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -22,7 +22,7 @@ import java.util.concurrent._
import java.util.regex.Pattern
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -102,7 +102,13 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
- @volatile private var numExecutorsFailed = 0
+ // Queue to store the timestamp of failed executors
+ private val failedExecutorsTimeStamps = new Queue[Long]()
+
+ private var clock: Clock = new SystemClock
+
+ private val executorFailuresValidityInterval =
+ sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
@volatile private var targetNumExecutors =
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
@@ -166,9 +172,26 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+ /**
+ * Use a different clock for YarnAllocator. This is mainly used for testing.
+ */
+ def setClock(newClock: Clock): Unit = {
+ clock = newClock
+ }
+
def getNumExecutorsRunning: Int = numExecutorsRunning
- def getNumExecutorsFailed: Int = numExecutorsFailed
+ def getNumExecutorsFailed: Int = synchronized {
+ val endTime = clock.getTimeMillis()
+
+ while (executorFailuresValidityInterval > 0
+ && failedExecutorsTimeStamps.nonEmpty
+ && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
+ failedExecutorsTimeStamps.dequeue()
+ }
+
+ failedExecutorsTimeStamps.size
+ }
/**
* A sequence of pending container requests that have not yet been fulfilled.
@@ -527,7 +550,8 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
- numExecutorsFailed += 1
+ // Enqueue the timestamp of failed executor
+ failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3816a84ab2..c4dd3202f0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -33,13 +33,20 @@ package object config {
.toSequence
.createOptional
- private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+ private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
.doc("Interval after which AM failures will be considered independent and " +
"not accumulate towards the attempt count.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+ ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
+ .doc("Interval after which Executor failures will be considered independent and not " +
+ "accumulate towards the attempt count.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
.doc("Maximum number of AM attempts before failing the app.")
.intConf
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6a861d6f66..f4f8bd435d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.ManualClock
class MockResolver extends DNSToSwitchMapping {
@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
+
+ test("window based failure executor counting") {
+ sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
+ val handler = createAllocator(4)
+ val clock = new ManualClock(0L)
+ handler.setClock(clock)
+
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getPendingAllocate.size should be (4)
+
+ val containers = Seq(
+ createContainer("host1"),
+ createContainer("host2"),
+ createContainer("host3"),
+ createContainer("host4")
+ )
+ handler.handleAllocatedContainers(containers)
+
+ val failedStatuses = containers.map { c =>
+ ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
+ }
+
+ handler.getNumExecutorsFailed should be (0)
+
+ clock.advance(100 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(0, 1))
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(101 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+
+ handler.processCompletedContainers(failedStatuses.slice(1, 3))
+ handler.getNumExecutorsFailed should be (2)
+
+ clock.advance(50 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(3, 4))
+ handler.getNumExecutorsFailed should be (3)
+
+ clock.advance(51 * 1000L)
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(50 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+ }
}