aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-17 14:21:51 -0500
committerThomas Graves <tgraves@apache.org>2015-04-17 14:21:51 -0500
commit50ab8a6543ad5c31e89c16df374d0cb13222fd1e (patch)
treedba7e72a087d6c42748603cae816208511c5e5cd /yarn/src/test/scala/org
parentc84d91692aa25c01882bcc3f9fd5de3cfa786195 (diff)
downloadspark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.gz
spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.bz2
spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.zip
[SPARK-2669] [yarn] Distribute client configuration to AM.
Currently, when Spark launches the Yarn AM, the process will use the local Hadoop configuration on the node where the AM launches, if one is present. A more correct approach is to use the same configuration used to launch the Spark job, since the user may have made modifications (such as adding app-specific configs). The approach taken here is to use the distributed cache to make all files in the Hadoop configuration directory available to the AM. This is a little overkill since only the AM needs them (the executors use the broadcast Hadoop configuration from the driver), but is the easier approach. Even though only a few files in that directory may end up being used, all of them are uploaded. This allows supporting use cases such as when auxiliary configuration files are used for SSL configuration, or when uploading a Hive configuration directory. Not all of these may be reflected in a o.a.h.conf.Configuration object, but may be needed when a driver in cluster mode instantiates, for example, a HiveConf object instead. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4142 from vanzin/SPARK-2669 and squashes the following commits: f5434b9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 013f0fb [Marcelo Vanzin] Review feedback. f693152 [Marcelo Vanzin] Le sigh. ed45b7d [Marcelo Vanzin] Zip all config files and upload them as an archive. 5927b6b [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 cbb9fb3 [Marcelo Vanzin] Remove stale test. e3e58d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 e3d0613 [Marcelo Vanzin] Review feedback. 34bdbd8 [Marcelo Vanzin] Fix test. 022a688 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 a77ddd5 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 79221c7 [Marcelo Vanzin] [SPARK-2669] [yarn] Distribute client configuration to AM.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala29
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala6
2 files changed, 21 insertions, 14 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index c1b94ac9c5..a51c2005cb 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -30,11 +35,6 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-import scala.reflect.ClassTag
-import scala.util.Try
-
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils
@@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- Client.populateClasspath(args, conf, sparkConf, env)
+ Client.populateClasspath(args, conf, sparkConf, env, true)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
cp should not contain (uri.getPath())
}
})
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- cp should contain("{{PWD}}")
- } else if (Utils.isWindows) {
- cp should contain("%PWD%")
- } else {
- cp should contain(Environment.PWD.$())
- }
+ val pwdVar =
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ "{{PWD}}"
+ } else if (Utils.isWindows) {
+ "%PWD%"
+ } else {
+ Environment.PWD.$()
+ }
+ cp should contain(pwdVar)
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a18c94d4ab..3877da4120 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
+ private var hadoopConfDir: File = _
private var logConfDir: File = _
override def beforeAll() {
@@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ assert(hadoopConfDir.mkdir())
+ File.createTempFile("token", ".txt", hadoopConfDir)
}
override def afterAll() {
@@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
appArgs
Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
}
/**