aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-28 12:38:19 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-04-28 12:38:19 -0500
commit8b44bd52fa40c0fc7d34798c3654e31533fd3008 (patch)
tree3642569112ccf117f1c91b363d2aa0445dd2e990 /yarn/src/test/scala/org
parent9e785079b6ed4ea691c3c14c762a7f73fb6254bf (diff)
downloadspark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.gz
spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.tar.bz2
spark-8b44bd52fa40c0fc7d34798c3654e31533fd3008.zip
[SPARK-6735][YARN] Add window based executor failure tracking mechanism for long running service
This work is based on twinkle-sachdeva 's proposal. In parallel to such mechanism for AM failures, here add similar mechanism for executor failure tracking, this is useful for long running Spark service to mitigate the executor failure problems. Please help to review, tgravescs sryza and vanzin Author: jerryshao <sshao@hortonworks.com> Closes #10241 from jerryshao/SPARK-6735.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala46
1 files changed, 46 insertions, 0 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6a861d6f66..f4f8bd435d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.ManualClock
class MockResolver extends DNSToSwitchMapping {
@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
+
+ test("window based failure executor counting") {
+ sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
+ val handler = createAllocator(4)
+ val clock = new ManualClock(0L)
+ handler.setClock(clock)
+
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getPendingAllocate.size should be (4)
+
+ val containers = Seq(
+ createContainer("host1"),
+ createContainer("host2"),
+ createContainer("host3"),
+ createContainer("host4")
+ )
+ handler.handleAllocatedContainers(containers)
+
+ val failedStatuses = containers.map { c =>
+ ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
+ }
+
+ handler.getNumExecutorsFailed should be (0)
+
+ clock.advance(100 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(0, 1))
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(101 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+
+ handler.processCompletedContainers(failedStatuses.slice(1, 3))
+ handler.getNumExecutorsFailed should be (2)
+
+ clock.advance(50 * 1000L)
+ handler.processCompletedContainers(failedStatuses.slice(3, 4))
+ handler.getNumExecutorsFailed should be (3)
+
+ clock.advance(51 * 1000L)
+ handler.getNumExecutorsFailed should be (1)
+
+ clock.advance(50 * 1000L)
+ handler.getNumExecutorsFailed should be (0)
+ }
}