aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala125
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala29
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala6
5 files changed, 132 insertions, 36 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 853c9f26b0..0968fc5ad6 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -211,7 +211,11 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
# Launching Spark on YARN
Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster.
-These configs are used to write to the dfs and connect to the YARN ResourceManager.
+These configs are used to write to the dfs and connect to the YARN ResourceManager. The
+configuration contained in this directory will be distributed to the YARN cluster so that all
+containers used by the application use the same configuration. If the configuration references
+Java system properties or environment variables not managed by YARN, they should also be set in the
+Spark application's configuration (driver, executors, and the AM when running in client mode).
There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 52e4dee46c..019afbd1a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,15 +17,18 @@
package org.apache.spark.deploy.yarn
+import java.io.{File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
+import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
+import com.google.common.io.Files
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.conf.Configuration
@@ -77,12 +80,6 @@ private[spark] class Client(
def stop(): Unit = yarnClient.stop()
- /* ------------------------------------------------------------------------------------- *
- | The following methods have much in common in the stable and alpha versions of Client, |
- | but cannot be implemented in the parent trait due to subtle API differences across |
- | hadoop versions. |
- * ------------------------------------------------------------------------------------- */
-
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
@@ -223,6 +220,10 @@ private[spark] class Client(
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = getNameNodesToAccess(sparkConf) + dst
+ // Used to keep track of URIs added to the distributed cache. If the same URI is added
+ // multiple times, YARN will fail to launch containers for the app with an internal
+ // error.
+ val distributedUris = new HashSet[String]
obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
@@ -241,6 +242,17 @@ private[spark] class Client(
"for alternatives.")
}
+ def addDistributedUri(uri: URI): Boolean = {
+ val uriStr = uri.toString()
+ if (distributedUris.contains(uriStr)) {
+ logWarning(s"Resource $uri added multiple times to distributed cache.")
+ false
+ } else {
+ distributedUris += uriStr
+ true
+ }
+ }
+
/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -258,11 +270,13 @@ private[spark] class Client(
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
- val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication)
- val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
- distCacheMgr.addResource(destFs, hadoopConf, destPath,
- localResources, LocalResourceType.FILE, destName, statCache)
+ if (addDistributedUri(localURI)) {
+ val src = getQualifiedLocalPath(localURI, hadoopConf)
+ val destPath = copyFileToRemote(dst, src, replication)
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(destFs, hadoopConf, destPath,
+ localResources, LocalResourceType.FILE, destName, statCache)
+ }
} else if (confKey != null) {
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
@@ -271,6 +285,13 @@ private[spark] class Client(
}
}
+ createConfArchive().foreach { file =>
+ require(addDistributedUri(file.toURI()))
+ val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
+ distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
+ LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
+ }
+
/**
* Do the same for any additional resources passed in through ClientArguments.
* Each resource category is represented by a 3-tuple of:
@@ -288,13 +309,15 @@ private[spark] class Client(
flist.split(',').foreach { file =>
val localURI = new URI(file.trim())
if (localURI.getScheme != LOCAL_SCHEME) {
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
- if (addToClasspath) {
- cachedSecondaryJarLinks += linkname
+ if (addDistributedUri(localURI)) {
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += linkname
+ }
}
} else if (addToClasspath) {
// Resource is intended for local use only and should be added to the class path
@@ -311,13 +334,64 @@ private[spark] class Client(
}
/**
+ * Create an archive with the Hadoop config files for distribution.
+ *
+ * These are only used by the AM, since executors will use the configuration object broadcast by
+ * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
+ * it when distributing to the AM. This directory is then added to the classpath of the AM
+ * process, just to make sure that everybody is using the same default config.
+ *
+ * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
+ * shows up in the classpath before YARN_CONF_DIR.
+ *
+ * Currently this makes a shallow copy of the conf directory. If there are cases where a
+ * Hadoop config directory contains subdirectories, this code will have to be fixed.
+ */
+ private def createConfArchive(): Option[File] = {
+ val hadoopConfFiles = new HashMap[String, File]()
+ Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
+ sys.env.get(envKey).foreach { path =>
+ val dir = new File(path)
+ if (dir.isDirectory()) {
+ dir.listFiles().foreach { file =>
+ if (!hadoopConfFiles.contains(file.getName())) {
+ hadoopConfFiles(file.getName()) = file
+ }
+ }
+ }
+ }
+ }
+
+ if (!hadoopConfFiles.isEmpty) {
+ val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+
+ val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
+ try {
+ hadoopConfStream.setLevel(0)
+ hadoopConfFiles.foreach { case (name, file) =>
+ hadoopConfStream.putNextEntry(new ZipEntry(name))
+ Files.copy(file, hadoopConfStream)
+ hadoopConfStream.closeEntry()
+ }
+ } finally {
+ hadoopConfStream.close()
+ }
+
+ Some(hadoopConfArchive)
+ } else {
+ None
+ }
+ }
+
+ /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
- populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+ populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -718,6 +792,9 @@ object Client extends Logging {
// Distribution-defined classpath to add to processes
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
+ // Subdirectory where the user's hadoop config files will be placed.
+ val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+
/**
* Find the user-defined Spark jar if configured, or return the jar containing this
* class if not.
@@ -831,11 +908,19 @@ object Client extends Logging {
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
+ isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach(addClasspathEntry(_, env))
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
+
+ if (isAM) {
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+ LOCALIZED_HADOOP_CONF_DIR, env)
+ }
+
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
val userClassPath =
if (args != null) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b06069c07f..9d04d241da 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -277,7 +277,7 @@ class ExecutorRunnable(
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
- Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
+ Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index c1b94ac9c5..a51c2005cb 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -30,11 +35,6 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-import scala.reflect.ClassTag
-import scala.util.Try
-
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils
@@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- Client.populateClasspath(args, conf, sparkConf, env)
+ Client.populateClasspath(args, conf, sparkConf, env, true)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
cp should not contain (uri.getPath())
}
})
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- cp should contain("{{PWD}}")
- } else if (Utils.isWindows) {
- cp should contain("%PWD%")
- } else {
- cp should contain(Environment.PWD.$())
- }
+ val pwdVar =
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ "{{PWD}}"
+ } else if (Utils.isWindows) {
+ "%PWD%"
+ } else {
+ Environment.PWD.$()
+ }
+ cp should contain(pwdVar)
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a18c94d4ab..3877da4120 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
+ private var hadoopConfDir: File = _
private var logConfDir: File = _
override def beforeAll() {
@@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ assert(hadoopConfDir.mkdir())
+ File.createTempFile("token", ".txt", hadoopConfDir)
}
override def afterAll() {
@@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
appArgs
Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
}
/**