aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-08-30 14:48:07 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-30 14:48:07 -0700
commitb6cf1348170951396a6a5d8a65fb670382304f5b (patch)
tree0214f05d66fcdb69373b143ffb998e4de52ff02a /yarn/common
parentd90434c03564558a4208f64e15b20009eabe3645 (diff)
downloadspark-b6cf1348170951396a6a5d8a65fb670382304f5b.tar.gz
spark-b6cf1348170951396a6a5d8a65fb670382304f5b.tar.bz2
spark-b6cf1348170951396a6a5d8a65fb670382304f5b.zip
[SPARK-2889] Create Hadoop config objects consistently.
Different places in the code were instantiating Configuration / YarnConfiguration objects in different ways. This could lead to confusion for people who actually expected "spark.hadoop.*" options to end up in the configs used by Spark code, since that would only happen for the SparkContext's config. This change modifies most places to use SparkHadoopUtil to initialize configs, and make that method do the translation that previously was only done inside SparkContext. The places that were not changed fall in one of the following categories: - Test code where this doesn't really matter - Places deep in the code where plumbing SparkConf would be too difficult for very little gain - Default values for arguments - since the caller can provide their own config in that case Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #1843 from vanzin/SPARK-2889 and squashes the following commits: 52daf35 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 f179013 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 51e71cf [Marcelo Vanzin] Add test to ensure that overriding Yarn configs works. 53f9506 [Marcelo Vanzin] Add DeveloperApi annotation. 3d345cb [Marcelo Vanzin] Restore old method for backwards compat. fc45067 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 0ac3fdf [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 3f26760 [Marcelo Vanzin] Compilation fix. f16cadd [Marcelo Vanzin] Initialize config in SparkHadoopUtil. b8ab173 [Marcelo Vanzin] Update Utils API to take a Configuration argument. 1e7003f [Marcelo Vanzin] Replace explicit Configuration instantiation with SparkHadoopUtil.
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala9
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala15
5 files changed, 22 insertions, 16 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5f66a98e75..8c54840971 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -48,7 +48,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// optimal as more containers are available. Might need to handle this better.
private val sparkConf = new SparkConf()
- private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
+ private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ .asInstanceOf[YarnConfiguration]
private val isDriver = args.userClass != null
// Default to numExecutors * 2, with minimum of 3
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2aa27a1908..ffe2731ca1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -54,7 +54,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
- override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+ override def newConfiguration(conf: SparkConf): Configuration =
+ new YarnConfiguration(super.newConfiguration(conf))
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index d162b4c433..254774a6b8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -26,14 +25,11 @@ import org.apache.spark.util.Utils
/**
* This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
+private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(conf, host))
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 69f40225a2..4157ff95c2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -21,19 +21,15 @@ import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
- def this(sc: SparkContext) = this(sc, new Configuration())
-
// Nothing else for now ... initialize application master : which needs a SparkContext to
// determine how to allocate.
// Note that only the first creation of a SparkContext influences (and ideally, there must be
@@ -43,8 +39,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
override def postStartHook() {
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7650bd4396..75db8ee6d4 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
@@ -61,4 +62,16 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
}
}
+ test("Yarn configuration override") {
+ val key = "yarn.nodemanager.hostname"
+ val default = new YarnConfiguration()
+
+ val sparkConf = new SparkConf()
+ .set("spark.hadoop." + key, "someHostName")
+ val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
+
+ yarnConf.getClass() should be (classOf[YarnConfiguration])
+ yarnConf.get(key) should not be default.get(key)
+ }
+
}