diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-04-17 14:21:51 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2015-04-17 14:21:51 -0500 |
commit | 50ab8a6543ad5c31e89c16df374d0cb13222fd1e (patch) | |
tree | dba7e72a087d6c42748603cae816208511c5e5cd /yarn/src/test/scala/org | |
parent | c84d91692aa25c01882bcc3f9fd5de3cfa786195 (diff) | |
download | spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.gz spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.bz2 spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.zip |
[SPARK-2669] [yarn] Distribute client configuration to AM.
Currently, when Spark launches the Yarn AM, the process will use
the local Hadoop configuration on the node where the AM launches,
if one is present. A more correct approach is to use the same
configuration used to launch the Spark job, since the user may
have made modifications (such as adding app-specific configs).
The approach taken here is to use the distributed cache to make
all files in the Hadoop configuration directory available to the
AM. This is a little overkill since only the AM needs them (the
executors use the broadcast Hadoop configuration from the driver),
but is the easier approach.
Even though only a few files in that directory may end up being
used, all of them are uploaded. This allows supporting use cases
such as when auxiliary configuration files are used for SSL
configuration, or when uploading a Hive configuration directory.
Not all of these may be reflected in a o.a.h.conf.Configuration object,
but may be needed when a driver in cluster mode instantiates, for
example, a HiveConf object instead.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #4142 from vanzin/SPARK-2669 and squashes the following commits:
f5434b9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
013f0fb [Marcelo Vanzin] Review feedback.
f693152 [Marcelo Vanzin] Le sigh.
ed45b7d [Marcelo Vanzin] Zip all config files and upload them as an archive.
5927b6b [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
cbb9fb3 [Marcelo Vanzin] Remove stale test.
e3e58d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
e3d0613 [Marcelo Vanzin] Review feedback.
34bdbd8 [Marcelo Vanzin] Fix test.
022a688 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
a77ddd5 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
79221c7 [Marcelo Vanzin] [SPARK-2669] [yarn] Distribute client configuration to AM.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 29 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 6 |
2 files changed, 21 insertions, 14 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index c1b94ac9c5..a51c2005cb 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URI +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.reflect.ClassTag +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig @@ -30,11 +35,6 @@ import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap => MutableHashMap } -import scala.reflect.ClassTag -import scala.util.Try - import org.apache.spark.{SparkException, SparkConf} import org.apache.spark.util.Utils @@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - Client.populateClasspath(args, conf, sparkConf, env) + Client.populateClasspath(args, conf, sparkConf, env, true) val cp = env("CLASSPATH").split(":|;|<CPS>") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { cp should not contain (uri.getPath()) } }) - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - cp should contain("{{PWD}}") - } else if (Utils.isWindows) { - cp should contain("%PWD%") - } else { - cp should contain(Environment.PWD.$()) - } + val pwdVar = + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + "{{PWD}}" + } else if (Utils.isWindows) { + "%PWD%" + } else { + Environment.PWD.$() + } + cp should contain(pwdVar) + cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}") cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index a18c94d4ab..3877da4120 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ + private var hadoopConfDir: File = _ private var logConfDir: File = _ override def beforeAll() { @@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) + hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR) + assert(hadoopConfDir.mkdir()) + File.createTempFile("token", ".txt", hadoopConfDir) } override def afterAll() { @@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit appArgs Utils.executeAndGetOutput(argv, - extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath())) + extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath())) } /** |