aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala62
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala42
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala58
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala50
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala84
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala17
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala3
11 files changed, 239 insertions, 175 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index acce6bc24f..4384a06bd7 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -225,6 +225,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ private[spark] def remove(entry: ConfigEntry[_]): SparkConf = {
+ remove(entry.key)
+ }
+
/** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d447a59937..5bb63500c8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,15 +19,17 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
-import java.net.{Socket, URL}
+import java.net.{Socket, URI, URL}
import java.util.concurrent.atomic.AtomicReference
+import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -120,6 +122,61 @@ private[spark] class ApplicationMaster(
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+ // Load the list of localized files set by the client. This is used when launching executors,
+ // and is loaded here so that these configs don't pollute the Web UI's environment page in
+ // cluster mode.
+ private val localResources = {
+ logInfo("Preparing Local resources")
+ val resources = HashMap[String, LocalResource]()
+
+ def setupDistributedCache(
+ file: String,
+ rtype: LocalResourceType,
+ timestamp: String,
+ size: String,
+ vis: String): Unit = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+
+ val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
+ resources(fileName) = amJarRsrc
+ }
+
+ val distFiles = sparkConf.get(CACHED_FILES)
+ val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
+ val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
+ val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+
+ for (i <- 0 to distFiles.size - 1) {
+ val resType = LocalResourceType.valueOf(resTypes(i))
+ setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
+ visibilities(i))
+ }
+
+ // Distribute the conf archive to executors.
+ sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
+ val fs = FileSystem.get(new URI(uri), yarnConf)
+ val status = fs.getFileStatus(new Path(uri))
+ setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
+ status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
+ }
+
+ // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
+ CACHE_CONFIGS.foreach { e =>
+ sparkConf.remove(e)
+ sys.props.remove(e.key)
+ }
+
+ logInfo("Prepared Local resources " + resources)
+ resources.toMap
+ }
+
def getAttemptId(): ApplicationAttemptId = {
client.getAttemptId()
}
@@ -292,7 +349,8 @@ private[spark] class ApplicationMaster(
_sparkConf,
uiAddress,
historyAddress,
- securityMgr)
+ securityMgr,
+ localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ae5fb6bbd4..8b07dc3af4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -328,12 +328,14 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
- replication: Short): Path = {
+ replication: Short,
+ force: Boolean = false,
+ destName: Option[String] = None): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
- if (!compareFs(srcFs, destFs)) {
- destPath = new Path(destDir, srcPath.getName())
+ if (force || !compareFs(srcFs, destFs)) {
+ destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
@@ -553,12 +555,37 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}
- // Distribute an archive with Hadoop and Spark configuration for the AM and executors.
+ // Update the configuration with all the distributed files, minus the conf archive. The
+ // conf archive will be handled by the AM differently so that we avoid having to send
+ // this configuration by other means. See SPARK-14602 for one reason of why this is needed.
+ distCacheMgr.updateConfiguration(sparkConf)
+
+ // Upload the conf archive to HDFS manually, and record its location in the configuration.
+ // This will allow the AM to know where the conf archive is in HDFS, so that it can be
+ // distributed to the containers.
+ //
+ // This code forces the archive to be copied, so that unit tests pass (since in that case both
+ // file systems are the same and the archive wouldn't normally be copied). In most (all?)
+ // deployments, the archive would be copied anyway, since it's a temp file in the local file
+ // system.
+ val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
+ val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
+ sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
+
+ val localConfArchive = new Path(createConfArchive().toURI())
+ copyFileToRemote(destDir, localConfArchive, replication, force = true,
+ destName = Some(LOCALIZED_CONF_ARCHIVE))
+
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)
+ // Clear the cache-related entries from the configuration to avoid them polluting the
+ // UI's environment page. This works for client mode; for cluster mode, this is handled
+ // by the AM.
+ CACHE_CONFIGS.foreach(sparkConf.remove)
+
localResources
}
@@ -787,10 +814,6 @@ private[spark] class Client(
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
- // Set the environment variables to be passed on to the executors.
- distCacheMgr.setDistFilesEnv(launchEnv)
- distCacheMgr.setDistArchivesEnv(launchEnv)
-
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
@@ -1150,6 +1173,9 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"
+ // File containing the conf archive in the AM. See prepareLocalResources().
+ val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
+
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 869edf6c5b..dcc2288dd1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn
import java.net.URI
-import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -27,17 +27,21 @@ import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+private case class CacheEntry(
+ uri: URI,
+ size: Long,
+ modTime: Long,
+ visibility: LocalResourceVisibility,
+ resType: LocalResourceType)
+
/** Client side methods to setup the Hadoop distributed cache */
private[spark] class ClientDistributedCacheManager() extends Logging {
- // Mappings from remote URI to (file status, modification time, visibility)
- private val distCacheFiles: Map[String, (String, String, String)] =
- LinkedHashMap[String, (String, String, String)]()
- private val distCacheArchives: Map[String, (String, String, String)] =
- LinkedHashMap[String, (String, String, String)]()
-
+ private val distCacheEntries = new ListBuffer[CacheEntry]()
/**
* Add a resource to the list of distributed cache resources. This list can
@@ -72,61 +76,33 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
- if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+ require(link != null && link.nonEmpty, "You must specify a valid link name.")
localResources(link) = amJarRsrc
if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
- if (resourceType == LocalResourceType.FILE) {
- distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- } else {
- distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- }
+ distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(),
+ visibility, resourceType)
}
}
/**
- * Adds the necessary cache file env variables to the env passed in
+ * Writes down information about cached files needed in executors to the given configuration.
*/
- def setDistFilesEnv(env: Map[String, String]): Unit = {
- val (keys, tupleValues) = distCacheFiles.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
- }
- }
-
- /**
- * Adds the necessary cache archive env variables to the env passed in
- */
- def setDistArchivesEnv(env: Map[String, String]): Unit = {
- val (keys, tupleValues) = distCacheArchives.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
- }
+ def updateConfiguration(conf: SparkConf): Unit = {
+ conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
+ conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
+ conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
+ conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
+ conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
}
/**
* Returns the local resource visibility depending on the cache file permissions
* @return LocalResourceVisibility
*/
- def getVisibility(
+ private[yarn] def getVisibility(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
@@ -141,7 +117,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Returns a boolean to denote whether a cache file is visible to all (public)
* @return true if the path in the uri is visible to all, false otherwise
*/
- def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+ private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
// the leaf level file should be readable by others
@@ -157,7 +133,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* the directory hierarchy to the given path)
* @return true if all ancestors have the 'execute' permission set for all users
*/
- def ancestorsHaveExecutePermissions(
+ private def ancestorsHaveExecutePermissions(
fs: FileSystem,
path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
@@ -177,7 +153,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
- def checkPermissionOfOther(
+ private def checkPermissionOfOther(
fs: FileSystem,
path: Path,
action: FsAction,
@@ -194,7 +170,10 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* it in the cache, and returns the FileStatus.
* @return FileStatus
*/
- def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+ private[yarn] def getFileStatus(
+ fs: FileSystem,
+ uri: URI,
+ statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ef7908a3ef..3d370e6d71 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.yarn
import java.io.File
-import java.net.URI
import java.nio.ByteBuffer
import java.util.Collections
@@ -55,7 +54,8 @@ private[yarn] class ExecutorRunnable(
executorMemory: Int,
executorCores: Int,
appId: String,
- securityMgr: SecurityManager)
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource])
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -77,9 +77,7 @@ private[yarn] class ExecutorRunnable(
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
- val localResources = prepareLocalResources
ctx.setLocalResources(localResources.asJava)
-
ctx.setEnvironment(env.asJava)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
@@ -88,7 +86,7 @@ private[yarn] class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appId, localResources)
+ appId)
logInfo(s"""
|===============================================================================
@@ -136,8 +134,7 @@ private[yarn] class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
- appId: String,
- localResources: HashMap[String, LocalResource]): List[String] = {
+ appId: String): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -239,53 +236,6 @@ private[yarn] class ExecutorRunnable(
commands.map(s => if (s == null) "null" else s).toList
}
- private def setupDistributedCache(
- file: String,
- rtype: LocalResourceType,
- localResources: HashMap[String, LocalResource],
- timestamp: String,
- size: String,
- vis: String): Unit = {
- val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource])
- amJarRsrc.setType(rtype)
- amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
- amJarRsrc.setTimestamp(timestamp.toLong)
- amJarRsrc.setSize(size.toLong)
- localResources(uri.getFragment()) = amJarRsrc
- }
-
- private def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val localResources = HashMap[String, LocalResource]()
-
- if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
- for( i <- 0 to distFiles.length - 1) {
- setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
- fileSizes(i), visibilities(i))
- }
- }
-
- if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
- val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
- for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
- timeStamps(i), fileSizes(i), visibilities(i))
- }
- }
-
- logInfo("Prepared Local resources " + localResources)
- localResources
- }
-
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 23742eab62..b59e6cff2f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -63,7 +63,8 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- securityMgr: SecurityManager)
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource])
extends Logging {
import YarnAllocator._
@@ -477,7 +478,8 @@ private[yarn] class YarnAllocator(
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
- securityMgr)
+ securityMgr,
+ localResources)
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index e7f7544664..53df11eb66 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.{List => JList}
import scala.collection.JavaConverters._
-import scala.collection.Map
import scala.util.Try
import org.apache.hadoop.conf.Configuration
@@ -52,6 +51,8 @@ private[spark] class YarnRMClient extends Logging {
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
+ * @param securityMgr The security manager.
+ * @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
driverUrl: String,
@@ -60,7 +61,8 @@ private[spark] class YarnRMClient extends Logging {
sparkConf: SparkConf,
uiAddress: String,
uiHistoryAddress: String,
- securityMgr: SecurityManager
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
@@ -72,7 +74,8 @@ private[spark] class YarnRMClient extends Logging {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
+ localResources)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index edfbfc5d58..3816a84ab2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -261,4 +261,54 @@ package object config {
.stringConf
.toSequence
.createOptional
+
+ /* Configuration and cached file propagation. */
+
+ private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes")
+ .internal()
+ .longConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps")
+ .internal()
+ .longConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ // Either "file" or "archive", for each file.
+ private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types")
+ .internal()
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ // The location of the conf archive in HDFS.
+ private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive")
+ .internal()
+ .stringConf
+ .createOptional
+
+ // The list of cache-related config entries. This is used by Client and the AM to clean
+ // up the environment so that these settings do not appear on the web UI.
+ private[yarn] val CACHE_CONFIGS = Seq(
+ CACHED_FILES,
+ CACHED_FILES_SIZES,
+ CACHED_FILES_TIMESTAMPS,
+ CACHED_FILES_VISIBILITIES,
+ CACHED_FILES_TYPES,
+ CACHED_CONF_ARCHIVE)
+
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index ac8f663df2..b696e080ce 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
@@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
// add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
@@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+ val sparkConf2 = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf2)
+
+ val files = sparkConf2.get(CACHED_FILES)
+ val sizes = sparkConf2.get(CACHED_FILES_SIZES)
+ val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
+
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
+ assert(timestamps(0) === 0)
+ assert(sizes(0) === 0)
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
+ assert(timestamps(1) === 10)
+ assert(sizes(1) === 20)
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
@@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
}
test("test addResource archive") {
@@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
}
-
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 06efd44b5d..f196a0d8ca 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
+ anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
@@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort())
+ any(classOf[Path]), anyShort(), anyBoolean(), any())
client
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index a641a6e73e..784c6525e5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new SecurityManager(sparkConf))
+ new SecurityManager(sparkConf),
+ Map())
}
def createContainer(host: String): Container = {