aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-17 14:21:51 -0500
committerThomas Graves <tgraves@apache.org>2015-04-17 14:21:51 -0500
commit50ab8a6543ad5c31e89c16df374d0cb13222fd1e (patch)
treedba7e72a087d6c42748603cae816208511c5e5cd /yarn
parentc84d91692aa25c01882bcc3f9fd5de3cfa786195 (diff)
downloadspark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.gz
spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.tar.bz2
spark-50ab8a6543ad5c31e89c16df374d0cb13222fd1e.zip
[SPARK-2669] [yarn] Distribute client configuration to AM.
Currently, when Spark launches the Yarn AM, the process will use the local Hadoop configuration on the node where the AM launches, if one is present. A more correct approach is to use the same configuration used to launch the Spark job, since the user may have made modifications (such as adding app-specific configs). The approach taken here is to use the distributed cache to make all files in the Hadoop configuration directory available to the AM. This is a little overkill since only the AM needs them (the executors use the broadcast Hadoop configuration from the driver), but is the easier approach. Even though only a few files in that directory may end up being used, all of them are uploaded. This allows supporting use cases such as when auxiliary configuration files are used for SSL configuration, or when uploading a Hive configuration directory. Not all of these may be reflected in a o.a.h.conf.Configuration object, but may be needed when a driver in cluster mode instantiates, for example, a HiveConf object instead. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4142 from vanzin/SPARK-2669 and squashes the following commits: f5434b9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 013f0fb [Marcelo Vanzin] Review feedback. f693152 [Marcelo Vanzin] Le sigh. ed45b7d [Marcelo Vanzin] Zip all config files and upload them as an archive. 5927b6b [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 cbb9fb3 [Marcelo Vanzin] Remove stale test. e3e58d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 e3d0613 [Marcelo Vanzin] Review feedback. 34bdbd8 [Marcelo Vanzin] Fix test. 022a688 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 a77ddd5 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 79221c7 [Marcelo Vanzin] [SPARK-2669] [yarn] Distribute client configuration to AM.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala125
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala29
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala6
4 files changed, 127 insertions, 35 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 52e4dee46c..019afbd1a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,15 +17,18 @@
package org.apache.spark.deploy.yarn
+import java.io.{File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
+import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
+import com.google.common.io.Files
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.conf.Configuration
@@ -77,12 +80,6 @@ private[spark] class Client(
def stop(): Unit = yarnClient.stop()
- /* ------------------------------------------------------------------------------------- *
- | The following methods have much in common in the stable and alpha versions of Client, |
- | but cannot be implemented in the parent trait due to subtle API differences across |
- | hadoop versions. |
- * ------------------------------------------------------------------------------------- */
-
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
@@ -223,6 +220,10 @@ private[spark] class Client(
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = getNameNodesToAccess(sparkConf) + dst
+ // Used to keep track of URIs added to the distributed cache. If the same URI is added
+ // multiple times, YARN will fail to launch containers for the app with an internal
+ // error.
+ val distributedUris = new HashSet[String]
obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
@@ -241,6 +242,17 @@ private[spark] class Client(
"for alternatives.")
}
+ def addDistributedUri(uri: URI): Boolean = {
+ val uriStr = uri.toString()
+ if (distributedUris.contains(uriStr)) {
+ logWarning(s"Resource $uri added multiple times to distributed cache.")
+ false
+ } else {
+ distributedUris += uriStr
+ true
+ }
+ }
+
/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -258,11 +270,13 @@ private[spark] class Client(
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
- val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication)
- val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
- distCacheMgr.addResource(destFs, hadoopConf, destPath,
- localResources, LocalResourceType.FILE, destName, statCache)
+ if (addDistributedUri(localURI)) {
+ val src = getQualifiedLocalPath(localURI, hadoopConf)
+ val destPath = copyFileToRemote(dst, src, replication)
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(destFs, hadoopConf, destPath,
+ localResources, LocalResourceType.FILE, destName, statCache)
+ }
} else if (confKey != null) {
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
@@ -271,6 +285,13 @@ private[spark] class Client(
}
}
+ createConfArchive().foreach { file =>
+ require(addDistributedUri(file.toURI()))
+ val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
+ distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
+ LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
+ }
+
/**
* Do the same for any additional resources passed in through ClientArguments.
* Each resource category is represented by a 3-tuple of:
@@ -288,13 +309,15 @@ private[spark] class Client(
flist.split(',').foreach { file =>
val localURI = new URI(file.trim())
if (localURI.getScheme != LOCAL_SCHEME) {
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
- if (addToClasspath) {
- cachedSecondaryJarLinks += linkname
+ if (addDistributedUri(localURI)) {
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += linkname
+ }
}
} else if (addToClasspath) {
// Resource is intended for local use only and should be added to the class path
@@ -311,13 +334,64 @@ private[spark] class Client(
}
/**
+ * Create an archive with the Hadoop config files for distribution.
+ *
+ * These are only used by the AM, since executors will use the configuration object broadcast by
+ * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
+ * it when distributing to the AM. This directory is then added to the classpath of the AM
+ * process, just to make sure that everybody is using the same default config.
+ *
+ * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
+ * shows up in the classpath before YARN_CONF_DIR.
+ *
+ * Currently this makes a shallow copy of the conf directory. If there are cases where a
+ * Hadoop config directory contains subdirectories, this code will have to be fixed.
+ */
+ private def createConfArchive(): Option[File] = {
+ val hadoopConfFiles = new HashMap[String, File]()
+ Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
+ sys.env.get(envKey).foreach { path =>
+ val dir = new File(path)
+ if (dir.isDirectory()) {
+ dir.listFiles().foreach { file =>
+ if (!hadoopConfFiles.contains(file.getName())) {
+ hadoopConfFiles(file.getName()) = file
+ }
+ }
+ }
+ }
+ }
+
+ if (!hadoopConfFiles.isEmpty) {
+ val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+
+ val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
+ try {
+ hadoopConfStream.setLevel(0)
+ hadoopConfFiles.foreach { case (name, file) =>
+ hadoopConfStream.putNextEntry(new ZipEntry(name))
+ Files.copy(file, hadoopConfStream)
+ hadoopConfStream.closeEntry()
+ }
+ } finally {
+ hadoopConfStream.close()
+ }
+
+ Some(hadoopConfArchive)
+ } else {
+ None
+ }
+ }
+
+ /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
- populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+ populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -718,6 +792,9 @@ object Client extends Logging {
// Distribution-defined classpath to add to processes
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
+ // Subdirectory where the user's hadoop config files will be placed.
+ val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+
/**
* Find the user-defined Spark jar if configured, or return the jar containing this
* class if not.
@@ -831,11 +908,19 @@ object Client extends Logging {
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
+ isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach(addClasspathEntry(_, env))
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
+
+ if (isAM) {
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+ LOCALIZED_HADOOP_CONF_DIR, env)
+ }
+
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
val userClassPath =
if (args != null) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b06069c07f..9d04d241da 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -277,7 +277,7 @@ class ExecutorRunnable(
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
- Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
+ Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index c1b94ac9c5..a51c2005cb 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -30,11 +35,6 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-import scala.reflect.ClassTag
-import scala.util.Try
-
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils
@@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- Client.populateClasspath(args, conf, sparkConf, env)
+ Client.populateClasspath(args, conf, sparkConf, env, true)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
cp should not contain (uri.getPath())
}
})
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- cp should contain("{{PWD}}")
- } else if (Utils.isWindows) {
- cp should contain("%PWD%")
- } else {
- cp should contain(Environment.PWD.$())
- }
+ val pwdVar =
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ "{{PWD}}"
+ } else if (Utils.isWindows) {
+ "%PWD%"
+ } else {
+ Environment.PWD.$()
+ }
+ cp should contain(pwdVar)
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a18c94d4ab..3877da4120 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
+ private var hadoopConfDir: File = _
private var logConfDir: File = _
override def beforeAll() {
@@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ assert(hadoopConfDir.mkdir())
+ File.createTempFile("token", ".txt", hadoopConfDir)
}
override def afterAll() {
@@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
appArgs
Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
}
/**