aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
committerImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
commit93cdb8a7d0f124b4db069fd8242207c82e263c52 (patch)
treec0f626664bfa6bad965b85a3cc54438bf15b4332 /yarn/src/test/scala/org
parent7d858bc5ce870a28a559f4e81dcfc54cbd128cb7 (diff)
downloadspark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.gz
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.bz2
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.zip
[SPARK-8425][CORE] Application Level Blacklisting
## What changes were proposed in this pull request? This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. ## How was this patch tested? Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes #14079 from squito/blacklist-SPARK-8425.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala57
1 files changed, 57 insertions, 0 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
new file mode 100644
index 0000000000..ffa0b58ee7
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.mockito.Mockito.when
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.serializer.JavaSerializer
+
+class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext {
+
+ test("RequestExecutors reflects node blacklist and is serializable") {
+ sc = new SparkContext("local", "YarnSchedulerBackendSuite")
+ val sched = mock[TaskSchedulerImpl]
+ when(sched.sc).thenReturn(sc)
+ val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+ def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
+ this.hostToLocalTaskCount = hostToLocalTaskCount
+ }
+ }
+ val ser = new JavaSerializer(sc.conf).newInstance()
+ for {
+ blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
+ numRequested <- 0 until 10
+ hostToLocalCount <- IndexedSeq(
+ Map[String, Int](),
+ Map("a" -> 1, "b" -> 2)
+ )
+ } {
+ yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
+ when(sched.nodeBlacklist()).thenReturn(blacklist)
+ val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+ assert(req.requestedTotal === numRequested)
+ assert(req.nodeBlacklist === blacklist)
+ assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
+ // Serialize to make sure serialization doesn't throw an error
+ ser.serialize(req)
+ }
+ }
+
+}