aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-04-20 16:57:23 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-20 16:57:23 -0700
commitf47dbf27fa034629fab12d0f3c89ab75edb03f86 (patch)
tree2445f305b0ef90e5577ec0bc608f1cf68f28960e /yarn
parent334c293ec0bcc2195d502c574ca40dbc4769d666 (diff)
downloadspark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.gz
spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.bz2
spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.zip
[SPARK-14602][YARN] Use SparkConf to propagate the list of cached files.
This change avoids using the environment to pass this information, since with many jars it's easy to hit limits on certain OSes. Instead, it encodes the information into the Spark configuration propagated to the AM. The first problem that needed to be solved is a chicken & egg issue: the config file is distributed using the cache, and it needs to contain information about the files that are being distributed. To solve that, the code now treats the config archive especially, and uses slightly different code to distribute it, so that only its cache path needs to be saved to the config file. The second problem is that the extra information would show up in the Web UI, which made the environment tab even more noisy than it already is when lots of jars are listed. This is solved by two changes: the list of cached files is now read only once in the AM, and propagated down to the ExecutorRunnable code (which actually sends the list to the NMs when starting containers). The second change is to unset those config entries after the list is read, so that the SparkContext never sees them. Tested with both client and cluster mode by running "run-example SparkPi". This uploads a whole lot of files when run from a build dir (instead of a distribution, where the list is cleaned up), and I verified that the configs do not show up in the UI. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #12487 from vanzin/SPARK-14602.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala62
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala42
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala58
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala50
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala84
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala17
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala3
10 files changed, 235 insertions, 175 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d447a59937..5bb63500c8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,15 +19,17 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
-import java.net.{Socket, URL}
+import java.net.{Socket, URI, URL}
import java.util.concurrent.atomic.AtomicReference
+import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -120,6 +122,61 @@ private[spark] class ApplicationMaster(
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+ // Load the list of localized files set by the client. This is used when launching executors,
+ // and is loaded here so that these configs don't pollute the Web UI's environment page in
+ // cluster mode.
+ private val localResources = {
+ logInfo("Preparing Local resources")
+ val resources = HashMap[String, LocalResource]()
+
+ def setupDistributedCache(
+ file: String,
+ rtype: LocalResourceType,
+ timestamp: String,
+ size: String,
+ vis: String): Unit = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+
+ val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
+ resources(fileName) = amJarRsrc
+ }
+
+ val distFiles = sparkConf.get(CACHED_FILES)
+ val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
+ val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
+ val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+
+ for (i <- 0 to distFiles.size - 1) {
+ val resType = LocalResourceType.valueOf(resTypes(i))
+ setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
+ visibilities(i))
+ }
+
+ // Distribute the conf archive to executors.
+ sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
+ val fs = FileSystem.get(new URI(uri), yarnConf)
+ val status = fs.getFileStatus(new Path(uri))
+ setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
+ status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
+ }
+
+ // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
+ CACHE_CONFIGS.foreach { e =>
+ sparkConf.remove(e)
+ sys.props.remove(e.key)
+ }
+
+ logInfo("Prepared Local resources " + resources)
+ resources.toMap
+ }
+
def getAttemptId(): ApplicationAttemptId = {
client.getAttemptId()
}
@@ -292,7 +349,8 @@ private[spark] class ApplicationMaster(
_sparkConf,
uiAddress,
historyAddress,
- securityMgr)
+ securityMgr,
+ localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ae5fb6bbd4..8b07dc3af4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -328,12 +328,14 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
- replication: Short): Path = {
+ replication: Short,
+ force: Boolean = false,
+ destName: Option[String] = None): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
- if (!compareFs(srcFs, destFs)) {
- destPath = new Path(destDir, srcPath.getName())
+ if (force || !compareFs(srcFs, destFs)) {
+ destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
@@ -553,12 +555,37 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}
- // Distribute an archive with Hadoop and Spark configuration for the AM and executors.
+ // Update the configuration with all the distributed files, minus the conf archive. The
+ // conf archive will be handled by the AM differently so that we avoid having to send
+ // this configuration by other means. See SPARK-14602 for one reason of why this is needed.
+ distCacheMgr.updateConfiguration(sparkConf)
+
+ // Upload the conf archive to HDFS manually, and record its location in the configuration.
+ // This will allow the AM to know where the conf archive is in HDFS, so that it can be
+ // distributed to the containers.
+ //
+ // This code forces the archive to be copied, so that unit tests pass (since in that case both
+ // file systems are the same and the archive wouldn't normally be copied). In most (all?)
+ // deployments, the archive would be copied anyway, since it's a temp file in the local file
+ // system.
+ val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
+ val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
+ sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
+
+ val localConfArchive = new Path(createConfArchive().toURI())
+ copyFileToRemote(destDir, localConfArchive, replication, force = true,
+ destName = Some(LOCALIZED_CONF_ARCHIVE))
+
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)
+ // Clear the cache-related entries from the configuration to avoid them polluting the
+ // UI's environment page. This works for client mode; for cluster mode, this is handled
+ // by the AM.
+ CACHE_CONFIGS.foreach(sparkConf.remove)
+
localResources
}
@@ -787,10 +814,6 @@ private[spark] class Client(
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
- // Set the environment variables to be passed on to the executors.
- distCacheMgr.setDistFilesEnv(launchEnv)
- distCacheMgr.setDistArchivesEnv(launchEnv)
-
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
@@ -1150,6 +1173,9 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"
+ // File containing the conf archive in the AM. See prepareLocalResources().
+ val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
+
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 869edf6c5b..dcc2288dd1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn
import java.net.URI
-import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -27,17 +27,21 @@ import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+private case class CacheEntry(
+ uri: URI,
+ size: Long,
+ modTime: Long,
+ visibility: LocalResourceVisibility,
+ resType: LocalResourceType)
+
/** Client side methods to setup the Hadoop distributed cache */
private[spark] class ClientDistributedCacheManager() extends Logging {
- // Mappings from remote URI to (file status, modification time, visibility)
- private val distCacheFiles: Map[String, (String, String, String)] =
- LinkedHashMap[String, (String, String, String)]()
- private val distCacheArchives: Map[String, (String, String, String)] =
- LinkedHashMap[String, (String, String, String)]()
-
+ private val distCacheEntries = new ListBuffer[CacheEntry]()
/**
* Add a resource to the list of distributed cache resources. This list can
@@ -72,61 +76,33 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
- if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+ require(link != null && link.nonEmpty, "You must specify a valid link name.")
localResources(link) = amJarRsrc
if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
- if (resourceType == LocalResourceType.FILE) {
- distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- } else {
- distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- }
+ distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(),
+ visibility, resourceType)
}
}
/**
- * Adds the necessary cache file env variables to the env passed in
+ * Writes down information about cached files needed in executors to the given configuration.
*/
- def setDistFilesEnv(env: Map[String, String]): Unit = {
- val (keys, tupleValues) = distCacheFiles.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
- }
- }
-
- /**
- * Adds the necessary cache archive env variables to the env passed in
- */
- def setDistArchivesEnv(env: Map[String, String]): Unit = {
- val (keys, tupleValues) = distCacheArchives.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
- }
+ def updateConfiguration(conf: SparkConf): Unit = {
+ conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
+ conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
+ conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
+ conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
+ conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
}
/**
* Returns the local resource visibility depending on the cache file permissions
* @return LocalResourceVisibility
*/
- def getVisibility(
+ private[yarn] def getVisibility(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
@@ -141,7 +117,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Returns a boolean to denote whether a cache file is visible to all (public)
* @return true if the path in the uri is visible to all, false otherwise
*/
- def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+ private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
// the leaf level file should be readable by others
@@ -157,7 +133,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* the directory hierarchy to the given path)
* @return true if all ancestors have the 'execute' permission set for all users
*/
- def ancestorsHaveExecutePermissions(
+ private def ancestorsHaveExecutePermissions(
fs: FileSystem,
path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
@@ -177,7 +153,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
- def checkPermissionOfOther(
+ private def checkPermissionOfOther(
fs: FileSystem,
path: Path,
action: FsAction,
@@ -194,7 +170,10 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* it in the cache, and returns the FileStatus.
* @return FileStatus
*/
- def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+ private[yarn] def getFileStatus(
+ fs: FileSystem,
+ uri: URI,
+ statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ef7908a3ef..3d370e6d71 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.yarn
import java.io.File
-import java.net.URI
import java.nio.ByteBuffer
import java.util.Collections
@@ -55,7 +54,8 @@ private[yarn] class ExecutorRunnable(
executorMemory: Int,
executorCores: Int,
appId: String,
- securityMgr: SecurityManager)
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource])
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -77,9 +77,7 @@ private[yarn] class ExecutorRunnable(
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
- val localResources = prepareLocalResources
ctx.setLocalResources(localResources.asJava)
-
ctx.setEnvironment(env.asJava)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
@@ -88,7 +86,7 @@ private[yarn] class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appId, localResources)
+ appId)
logInfo(s"""
|===============================================================================
@@ -136,8 +134,7 @@ private[yarn] class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
- appId: String,
- localResources: HashMap[String, LocalResource]): List[String] = {
+ appId: String): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -239,53 +236,6 @@ private[yarn] class ExecutorRunnable(
commands.map(s => if (s == null) "null" else s).toList
}
- private def setupDistributedCache(
- file: String,
- rtype: LocalResourceType,
- localResources: HashMap[String, LocalResource],
- timestamp: String,
- size: String,
- vis: String): Unit = {
- val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource])
- amJarRsrc.setType(rtype)
- amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
- amJarRsrc.setTimestamp(timestamp.toLong)
- amJarRsrc.setSize(size.toLong)
- localResources(uri.getFragment()) = amJarRsrc
- }
-
- private def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val localResources = HashMap[String, LocalResource]()
-
- if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
- for( i <- 0 to distFiles.length - 1) {
- setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
- fileSizes(i), visibilities(i))
- }
- }
-
- if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
- val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
- for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
- timeStamps(i), fileSizes(i), visibilities(i))
- }
- }
-
- logInfo("Prepared Local resources " + localResources)
- localResources
- }
-
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 23742eab62..b59e6cff2f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -63,7 +63,8 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- securityMgr: SecurityManager)
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource])
extends Logging {
import YarnAllocator._
@@ -477,7 +478,8 @@ private[yarn] class YarnAllocator(
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
- securityMgr)
+ securityMgr,
+ localResources)
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index e7f7544664..53df11eb66 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.{List => JList}
import scala.collection.JavaConverters._
-import scala.collection.Map
import scala.util.Try
import org.apache.hadoop.conf.Configuration
@@ -52,6 +51,8 @@ private[spark] class YarnRMClient extends Logging {
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
+ * @param securityMgr The security manager.
+ * @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
driverUrl: String,
@@ -60,7 +61,8 @@ private[spark] class YarnRMClient extends Logging {
sparkConf: SparkConf,
uiAddress: String,
uiHistoryAddress: String,
- securityMgr: SecurityManager
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
@@ -72,7 +74,8 @@ private[spark] class YarnRMClient extends Logging {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
+ localResources)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index edfbfc5d58..3816a84ab2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -261,4 +261,54 @@ package object config {
.stringConf
.toSequence
.createOptional
+
+ /* Configuration and cached file propagation. */
+
+ private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes")
+ .internal()
+ .longConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps")
+ .internal()
+ .longConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ // Either "file" or "archive", for each file.
+ private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ // The location of the conf archive in HDFS.
+ private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive")
+ .internal()
+ .stringConf
+ .createOptional
+
+ // The list of cache-related config entries. This is used by Client and the AM to clean
+ // up the environment so that these settings do not appear on the web UI.
+ private[yarn] val CACHE_CONFIGS = Seq(
+ CACHED_FILES,
+ CACHED_FILES_SIZES,
+ CACHED_FILES_TIMESTAMPS,
+ CACHED_FILES_VISIBILITIES,
+ CACHED_FILES_TYPES,
+ CACHED_CONF_ARCHIVE)
+
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index ac8f663df2..b696e080ce 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
@@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
// add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
@@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+ val sparkConf2 = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf2)
+
+ val files = sparkConf2.get(CACHED_FILES)
+ val sizes = sparkConf2.get(CACHED_FILES_SIZES)
+ val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
+
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
+ assert(timestamps(0) === 0)
+ assert(sizes(0) === 0)
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
+ assert(timestamps(1) === 10)
+ assert(sizes(1) === 20)
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
@@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
}
test("test addResource archive") {
@@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
}
-
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 06efd44b5d..f196a0d8ca 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
+ anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
@@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort())
+ any(classOf[Path]), anyShort(), anyBoolean(), any())
client
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index a641a6e73e..784c6525e5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new SecurityManager(sparkConf))
+ new SecurityManager(sparkConf),
+ Map())
}
def createContainer(host: String): Container = {