diff options
author | Imran Rashid <irashid@cloudera.com> | 2016-12-15 08:29:56 -0600 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2016-12-15 08:29:56 -0600 |
commit | 93cdb8a7d0f124b4db069fd8242207c82e263c52 (patch) | |
tree | c0f626664bfa6bad965b85a3cc54438bf15b4332 /yarn/src | |
parent | 7d858bc5ce870a28a559f4e81dcfc54cbd128cb7 (diff) | |
download | spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.gz spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.bz2 spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.zip |
[SPARK-8425][CORE] Application Level Blacklisting
## What changes were proposed in this pull request?
This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira.
## How was this patch tested?
Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness.
The added tests include:
- verifying BlacklistTracker works correctly
- verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker)
- an integration test for the entire scheduler with blacklisting in a few different scenarios
Author: Imran Rashid <irashid@cloudera.com>
Author: mwws <wei.mao@intel.com>
Closes #14079 from squito/blacklist-SPARK-8425.
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala new file mode 100644 index 0000000000..ffa0b58ee7 --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster + +import org.mockito.Mockito.when +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.serializer.JavaSerializer + +class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext { + + test("RequestExecutors reflects node blacklist and is serializable") { + sc = new SparkContext("local", "YarnSchedulerBackendSuite") + val sched = mock[TaskSchedulerImpl] + when(sched.sc).thenReturn(sc) + val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) { + def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = { + this.hostToLocalTaskCount = hostToLocalTaskCount + } + } + val ser = new JavaSerializer(sc.conf).newInstance() + for { + blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c")) + numRequested <- 0 until 10 + hostToLocalCount <- IndexedSeq( + Map[String, Int](), + Map("a" -> 1, "b" -> 2) + ) + } { + yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount) + when(sched.nodeBlacklist()).thenReturn(blacklist) + val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested) + assert(req.requestedTotal === numRequested) + assert(req.nodeBlacklist === blacklist) + assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty) + // Serialize to make sure serialization doesn't throw an error + ser.serialize(req) + } + } + +} |