diff options
author | jerryshao <sshao@hortonworks.com> | 2016-04-28 12:38:19 -0500 |
---|---|---|
committer | Tom Graves <tgraves@yahoo-inc.com> | 2016-04-28 12:38:19 -0500 |
commit | 8b44bd52fa40c0fc7d34798c3654e31533fd3008 (patch) | |
tree | 3642569112ccf117f1c91b363d2aa0445dd2e990 | |
parent | 9e785079b6ed4ea691c3c14c762a7f73fb6254bf (diff) | |
download | spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.gz spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.bz2 spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.zip |
[SPARK-6735][YARN] Add window based executor failure tracking mechanism for long running service
This work is based on twinkle-sachdeva 's proposal. In parallel to such mechanism for AM failures, here add similar mechanism for executor failure tracking, this is useful for long running Spark service to mitigate the executor failure problems.
Please help to review, tgravescs sryza and vanzin
Author: jerryshao <sshao@hortonworks.com>
Closes #10241 from jerryshao/SPARK-6735.
5 files changed, 93 insertions, 8 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 09701abdb0..3bd16bf60c 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -372,6 +372,14 @@ If you need a reference to the proper location to put log files in the YARN so t </td> </tr> <tr> + <td><code>spark.yarn.executor.failuresValidityInterval</code></td> + <td>(none)</td> + <td> + Defines the validity interval for executor failure tracking. + Executor failures which are older than the validity interval will be ignored. + </td> +</tr> +<tr> <td><code>spark.yarn.submit.waitAppCompletion</code></td> <td><code>true</code></td> <td> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8b07dc3af4..6184ad591c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -231,14 +231,14 @@ private[spark] class Client( "Cluster's default value will be used.") } - sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => + sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => try { val method = appContext.getClass().getMethod( "setAttemptFailuresValidityInterval", classOf[Long]) method.invoke(appContext, interval: java.lang.Long) } catch { case e: NoSuchMethodException => - logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + + logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + "the version of YARN does not support it") } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b59e6cff2f..066c665954 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -22,7 +22,7 @@ import java.util.concurrent._ import java.util.regex.Pattern import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -102,7 +102,13 @@ private[yarn] class YarnAllocator( private var executorIdCounter: Int = driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) - @volatile private var numExecutorsFailed = 0 + // Queue to store the timestamp of failed executors + private val failedExecutorsTimeStamps = new Queue[Long]() + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) @@ -166,9 +172,26 @@ private[yarn] class YarnAllocator( private[yarn] val containerPlacementStrategy = new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource) + /** + * Use a different clock for YarnAllocator. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { + clock = newClock + } + def getNumExecutorsRunning: Int = numExecutorsRunning - def getNumExecutorsFailed: Int = numExecutorsFailed + def getNumExecutorsFailed: Int = synchronized { + val endTime = clock.getTimeMillis() + + while (executorFailuresValidityInterval > 0 + && failedExecutorsTimeStamps.nonEmpty + && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) { + failedExecutorsTimeStamps.dequeue() + } + + failedExecutorsTimeStamps.size + } /** * A sequence of pending container requests that have not yet been fulfilled. @@ -527,7 +550,8 @@ private[yarn] class YarnAllocator( completedContainer.getDiagnostics, PMEM_EXCEEDED_PATTERN)) case _ => - numExecutorsFailed += 1 + // Enqueue the timestamp of failed executor + failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) (true, "Container marked as failed: " + containerId + onHostStr + ". Exit status: " + completedContainer.getExitStatus + ". Diagnostics: " + completedContainer.getDiagnostics) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 3816a84ab2..c4dd3202f0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -33,13 +33,20 @@ package object config { .toSequence .createOptional - private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = + private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval") .doc("Interval after which AM failures will be considered independent and " + "not accumulate towards the attempt count.") .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = + ConfigBuilder("spark.yarn.executor.failuresValidityInterval") + .doc("Interval after which Executor failures will be considered independent and not " + + "accumulate towards the attempt count.") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts") .doc("Maximum number of AM attempts before failing the app.") .intConf diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 6a861d6f66..f4f8bd435d 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.ManualClock class MockResolver extends DNSToSwitchMapping { @@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) } + + test("window based failure executor counting") { + sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s") + val handler = createAllocator(4) + val clock = new ManualClock(0L) + handler.setClock(clock) + + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (4) + + val containers = Seq( + createContainer("host1"), + createContainer("host2"), + createContainer("host3"), + createContainer("host4") + ) + handler.handleAllocatedContainers(containers) + + val failedStatuses = containers.map { c => + ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1) + } + + handler.getNumExecutorsFailed should be (0) + + clock.advance(100 * 1000L) + handler.processCompletedContainers(failedStatuses.slice(0, 1)) + handler.getNumExecutorsFailed should be (1) + + clock.advance(101 * 1000L) + handler.getNumExecutorsFailed should be (0) + + handler.processCompletedContainers(failedStatuses.slice(1, 3)) + handler.getNumExecutorsFailed should be (2) + + clock.advance(50 * 1000L) + handler.processCompletedContainers(failedStatuses.slice(3, 4)) + handler.getNumExecutorsFailed should be (3) + + clock.advance(51 * 1000L) + handler.getNumExecutorsFailed should be (1) + + clock.advance(50 * 1000L) + handler.getNumExecutorsFailed should be (0) + } } |