aboutsummaryrefslogtreecommitdiff
path: root/resource-managers/yarn/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers/yarn/src/main/scala/org/apache')
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala6
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala40
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala13
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala2
4 files changed, 48 insertions, 13 deletions
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index f2b6324db6..257dc83621 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
@@ -83,7 +82,8 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
private[yarn] class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
- val resource: Resource) {
+ val resource: Resource,
+ resolver: SparkRackResolver) {
/**
* Calculate each container's node locality and rack locality
@@ -139,7 +139,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
// still be allocated with new container request.
val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
val racks = hosts.map { h =>
- RackResolver.resolve(yarnConf, h).getNetworkLocation
+ resolver.resolve(yarnConf, h)
}.toSet
containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
new file mode 100644
index 0000000000..c711d088f2
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+import org.apache.log4j.{Level, Logger}
+
+/**
+ * Wrapper around YARN's [[RackResolver]]. This allows Spark tests to easily override the
+ * default behavior, since YARN's class self-initializes the first time it's called, and
+ * future calls all use the initial configuration.
+ */
+private[yarn] class SparkRackResolver {
+
+ // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
+ if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
+ Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
+ }
+
+ def resolve(conf: Configuration, hostName: String): String = {
+ RackResolver.resolve(conf, hostName).getNetworkLocation()
+ }
+
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 25556763da..ed77a6e4a1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -65,16 +64,12 @@ private[yarn] class YarnAllocator(
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
- localResources: Map[String, LocalResource])
+ localResources: Map[String, LocalResource],
+ resolver: SparkRackResolver)
extends Logging {
import YarnAllocator._
- // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
- if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
- Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
- }
-
// Visible for testing.
val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
@@ -159,7 +154,7 @@ private[yarn] class YarnAllocator(
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
- new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+ new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
@@ -424,7 +419,7 @@ private[yarn] class YarnAllocator(
// Match remaining by rack
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
- val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
+ val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 53fb467f64..72f4d273ab 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -75,7 +75,7 @@ private[spark] class YarnRMClient extends Logging {
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
- localResources)
+ localResources, new SparkRackResolver())
}
/**