diff options
11 files changed, 239 insertions, 175 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index acce6bc24f..4384a06bd7 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -225,6 +225,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + private[spark] def remove(entry: ConfigEntry[_]): SparkConf = { + remove(entry.key) + } + /** Get a parameter; throws a NoSuchElementException if it's not set */ def get(key: String): String = { getOption(key).getOrElse(throw new NoSuchElementException(key)) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d447a59937..5bb63500c8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -19,15 +19,17 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException -import java.net.{Socket, URL} +import java.net.{Socket, URI, URL} import java.util.concurrent.atomic.AtomicReference +import scala.collection.mutable.HashMap import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -120,6 +122,61 @@ private[spark] class ApplicationMaster( private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None + // Load the list of localized files set by the client. This is used when launching executors, + // and is loaded here so that these configs don't pollute the Web UI's environment page in + // cluster mode. + private val localResources = { + logInfo("Preparing Local resources") + val resources = HashMap[String, LocalResource]() + + def setupDistributedCache( + file: String, + rtype: LocalResourceType, + timestamp: String, + size: String, + vis: String): Unit = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]) + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + + val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName()) + resources(fileName) = amJarRsrc + } + + val distFiles = sparkConf.get(CACHED_FILES) + val fileSizes = sparkConf.get(CACHED_FILES_SIZES) + val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS) + val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES) + val resTypes = sparkConf.get(CACHED_FILES_TYPES) + + for (i <- 0 to distFiles.size - 1) { + val resType = LocalResourceType.valueOf(resTypes(i)) + setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString, + visibilities(i)) + } + + // Distribute the conf archive to executors. + sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri => + val fs = FileSystem.get(new URI(uri), yarnConf) + val status = fs.getFileStatus(new Path(uri)) + setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString, + status.getLen.toString, LocalResourceVisibility.PRIVATE.name()) + } + + // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy). + CACHE_CONFIGS.foreach { e => + sparkConf.remove(e) + sys.props.remove(e.key) + } + + logInfo("Prepared Local resources " + resources) + resources.toMap + } + def getAttemptId(): ApplicationAttemptId = { client.getAttemptId() } @@ -292,7 +349,8 @@ private[spark] class ApplicationMaster( _sparkConf, uiAddress, historyAddress, - securityMgr) + securityMgr, + localResources) allocator.allocateResources() reporterThread = launchReporterThread() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ae5fb6bbd4..8b07dc3af4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -328,12 +328,14 @@ private[spark] class Client( private[yarn] def copyFileToRemote( destDir: Path, srcPath: Path, - replication: Short): Path = { + replication: Short, + force: Boolean = false, + destName: Option[String] = None): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath - if (!compareFs(srcFs, destFs)) { - destPath = new Path(destDir, srcPath.getName()) + if (force || !compareFs(srcFs, destFs)) { + destPath = new Path(destDir, destName.getOrElse(srcPath.getName())) logInfo(s"Uploading resource $srcPath -> $destPath") FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) destFs.setReplication(destPath, replication) @@ -553,12 +555,37 @@ private[spark] class Client( distribute(f, targetDir = targetDir) } - // Distribute an archive with Hadoop and Spark configuration for the AM and executors. + // Update the configuration with all the distributed files, minus the conf archive. The + // conf archive will be handled by the AM differently so that we avoid having to send + // this configuration by other means. See SPARK-14602 for one reason of why this is needed. + distCacheMgr.updateConfiguration(sparkConf) + + // Upload the conf archive to HDFS manually, and record its location in the configuration. + // This will allow the AM to know where the conf archive is in HDFS, so that it can be + // distributed to the containers. + // + // This code forces the archive to be copied, so that unit tests pass (since in that case both + // file systems are the same and the archive wouldn't normally be copied). In most (all?) + // deployments, the archive would be copied anyway, since it's a temp file in the local file + // system. + val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE) + val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf) + sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) + + val localConfArchive = new Path(createConfArchive().toURI()) + copyFileToRemote(destDir, localConfArchive, replication, force = true, + destName = Some(LOCALIZED_CONF_ARCHIVE)) + val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(), resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_CONF_DIR)) require(confLocalizedPath != null) + // Clear the cache-related entries from the configuration to avoid them polluting the + // UI's environment page. This works for client mode; for cluster mode, this is handled + // by the AM. + CACHE_CONFIGS.foreach(sparkConf.remove) + localResources } @@ -787,10 +814,6 @@ private[spark] class Client( val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) - // Set the environment variables to be passed on to the executors. - distCacheMgr.setDistFilesEnv(launchEnv) - distCacheMgr.setDistArchivesEnv(launchEnv) - val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources.asJava) amContainer.setEnvironment(launchEnv.asJava) @@ -1150,6 +1173,9 @@ private object Client extends Logging { // Subdirectory where the user's Spark and Hadoop config files will be placed. val LOCALIZED_CONF_DIR = "__spark_conf__" + // File containing the conf archive in the AM. See prepareLocalResources(). + val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip" + // Name of the file in the conf archive containing Spark configuration. val SPARK_CONF_FILE = "__spark_conf__.properties" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 869edf6c5b..dcc2288dd1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn import java.net.URI -import scala.collection.mutable.{HashMap, LinkedHashMap, Map} +import scala.collection.mutable.{HashMap, ListBuffer, Map} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -27,17 +27,21 @@ import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +private case class CacheEntry( + uri: URI, + size: Long, + modTime: Long, + visibility: LocalResourceVisibility, + resType: LocalResourceType) + /** Client side methods to setup the Hadoop distributed cache */ private[spark] class ClientDistributedCacheManager() extends Logging { - // Mappings from remote URI to (file status, modification time, visibility) - private val distCacheFiles: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - private val distCacheArchives: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - + private val distCacheEntries = new ListBuffer[CacheEntry]() /** * Add a resource to the list of distributed cache resources. This list can @@ -72,61 +76,33 @@ private[spark] class ClientDistributedCacheManager() extends Logging { amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)) amJarRsrc.setTimestamp(destStatus.getModificationTime()) amJarRsrc.setSize(destStatus.getLen()) - if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name") + require(link != null && link.nonEmpty, "You must specify a valid link name.") localResources(link) = amJarRsrc if (!appMasterOnly) { val uri = destPath.toUri() val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) - if (resourceType == LocalResourceType.FILE) { - distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } else { - distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } + distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(), + visibility, resourceType) } } /** - * Adds the necessary cache file env variables to the env passed in + * Writes down information about cached files needed in executors to the given configuration. */ - def setDistFilesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheFiles.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc, n) => acc + "," + n } - } - } - - /** - * Adds the necessary cache archive env variables to the env passed in - */ - def setDistArchivesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheArchives.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc, n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc, n) => acc + "," + n } - } + def updateConfiguration(conf: SparkConf): Unit = { + conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString)) + conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size)) + conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime)) + conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name())) + conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name())) } /** * Returns the local resource visibility depending on the cache file permissions * @return LocalResourceVisibility */ - def getVisibility( + private[yarn] def getVisibility( conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): LocalResourceVisibility = { @@ -141,7 +117,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * Returns a boolean to denote whether a cache file is visible to all (public) * @return true if the path in the uri is visible to all, false otherwise */ - def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { + private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { val fs = FileSystem.get(uri, conf) val current = new Path(uri.getPath()) // the leaf level file should be readable by others @@ -157,7 +133,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * the directory hierarchy to the given path) * @return true if all ancestors have the 'execute' permission set for all users */ - def ancestorsHaveExecutePermissions( + private def ancestorsHaveExecutePermissions( fs: FileSystem, path: Path, statCache: Map[URI, FileStatus]): Boolean = { @@ -177,7 +153,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * imply the permission in the passed FsAction * @return true if the path in the uri is visible to all, false otherwise */ - def checkPermissionOfOther( + private def checkPermissionOfOther( fs: FileSystem, path: Path, action: FsAction, @@ -194,7 +170,10 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * it in the cache, and returns the FileStatus. * @return FileStatus */ - def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = { + private[yarn] def getFileStatus( + fs: FileSystem, + uri: URI, + statCache: Map[URI, FileStatus]): FileStatus = { val stat = statCache.get(uri) match { case Some(existstat) => existstat case None => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ef7908a3ef..3d370e6d71 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.URI import java.nio.ByteBuffer import java.util.Collections @@ -55,7 +54,8 @@ private[yarn] class ExecutorRunnable( executorMemory: Int, executorCores: Int, appId: String, - securityMgr: SecurityManager) + securityMgr: SecurityManager, + localResources: Map[String, LocalResource]) extends Runnable with Logging { var rpc: YarnRPC = YarnRPC.create(conf) @@ -77,9 +77,7 @@ private[yarn] class ExecutorRunnable( val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] - val localResources = prepareLocalResources ctx.setLocalResources(localResources.asJava) - ctx.setEnvironment(env.asJava) val credentials = UserGroupInformation.getCurrentUser().getCredentials() @@ -88,7 +86,7 @@ private[yarn] class ExecutorRunnable( ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - appId, localResources) + appId) logInfo(s""" |=============================================================================== @@ -136,8 +134,7 @@ private[yarn] class ExecutorRunnable( hostname: String, executorMemory: Int, executorCores: Int, - appId: String, - localResources: HashMap[String, LocalResource]): List[String] = { + appId: String): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -239,53 +236,6 @@ private[yarn] class ExecutorRunnable( commands.map(s => if (s == null) "null" else s).toList } - private def setupDistributedCache( - file: String, - rtype: LocalResourceType, - localResources: HashMap[String, LocalResource], - timestamp: String, - size: String, - vis: String): Unit = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - localResources(uri.getFragment()) = amJarRsrc - } - - private def prepareLocalResources: HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - val localResources = HashMap[String, LocalResource]() - - if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') - for( i <- 0 to distFiles.length - 1) { - setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), - fileSizes(i), visibilities(i)) - } - } - - if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') - val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') - for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, - timeStamps(i), fileSizes(i), visibilities(i)) - } - } - - logInfo("Prepared Local resources " + localResources) - localResources - } - private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 23742eab62..b59e6cff2f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -63,7 +63,8 @@ private[yarn] class YarnAllocator( sparkConf: SparkConf, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - securityMgr: SecurityManager) + securityMgr: SecurityManager, + localResources: Map[String, LocalResource]) extends Logging { import YarnAllocator._ @@ -477,7 +478,8 @@ private[yarn] class YarnAllocator( executorMemory, executorCores, appAttemptId.getApplicationId.toString, - securityMgr) + securityMgr, + localResources) if (launchContainers) { logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( driverUrl, executorHostname)) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index e7f7544664..53df11eb66 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn import java.util.{List => JList} import scala.collection.JavaConverters._ -import scala.collection.Map import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -52,6 +51,8 @@ private[spark] class YarnRMClient extends Logging { * @param sparkConf The Spark configuration. * @param uiAddress Address of the SparkUI. * @param uiHistoryAddress Address of the application on the History Server. + * @param securityMgr The security manager. + * @param localResources Map with information about files distributed via YARN's cache. */ def register( driverUrl: String, @@ -60,7 +61,8 @@ private[spark] class YarnRMClient extends Logging { sparkConf: SparkConf, uiAddress: String, uiHistoryAddress: String, - securityMgr: SecurityManager + securityMgr: SecurityManager, + localResources: Map[String, LocalResource] ): YarnAllocator = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) @@ -72,7 +74,8 @@ private[spark] class YarnRMClient extends Logging { amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) registered = true } - new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr) + new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, + localResources) } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index edfbfc5d58..3816a84ab2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -261,4 +261,54 @@ package object config { .stringConf .toSequence .createOptional + + /* Configuration and cached file propagation. */ + + private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames") + .internal() + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes") + .internal() + .longConf + .toSequence + .createWithDefault(Nil) + + private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps") + .internal() + .longConf + .toSequence + .createWithDefault(Nil) + + private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities") + .internal() + .stringConf + .toSequence + .createWithDefault(Nil) + + // Either "file" or "archive", for each file. + private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types") + .internal() + .stringConf + .toSequence + .createWithDefault(Nil) + + // The location of the conf archive in HDFS. + private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive") + .internal() + .stringConf + .createOptional + + // The list of cache-related config entries. This is used by Client and the AM to clean + // up the environment so that these settings do not appear on the web UI. + private[yarn] val CACHE_CONFIGS = Seq( + CACHED_FILES, + CACHED_FILES_SIZES, + CACHED_FILES_TIMESTAMPS, + CACHED_FILES_VISIBILITIES, + CACHED_FILES_TYPES, + CACHED_CONF_ARCHIVE) + } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index ac8f663df2..b696e080ce 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.mockito.Mockito.when import org.scalatest.mock.MockitoSugar -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar { @@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 0) assert(resource.getType() === LocalResourceType.FILE) - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") - assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") - assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L)) + assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L)) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) + assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name())) // add another one and verify both there and order correct val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", @@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource2.getSize() === 20) assert(resource2.getType() === LocalResourceType.FILE) - val env2 = new HashMap[String, String]() - distMgr.setDistFilesEnv(env2) - val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val files = env2("SPARK_YARN_CACHE_FILES").split(',') - val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') + val sparkConf2 = new SparkConf(false) + distMgr.updateConfiguration(sparkConf2) + + val files = sparkConf2.get(CACHED_FILES) + val sizes = sparkConf2.get(CACHED_FILES_SIZES) + val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS) + val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES) + assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(timestamps(0) === "0") - assert(sizes(0) === "0") + assert(timestamps(0) === 0) + assert(sizes(0) === 0) assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") - assert(timestamps(1) === "10") - assert(sizes(1) === "20") + assert(timestamps(1) === 10) + assert(sizes(1) === 20) assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) } @@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 20) assert(resource.getType() === LocalResourceType.ARCHIVE) - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Nil) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil) + assert(sparkConf.get(CACHED_FILES_SIZES) === Nil) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil) + assert(sparkConf.get(CACHED_FILES_TYPES) === Nil) } test("test addResource archive") { @@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 20) assert(resource.getType() === LocalResourceType.ARCHIVE) - val env = new HashMap[String, String]() - - distMgr.setDistArchivesEnv(env) - assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") - assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") - assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) + assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L)) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L)) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) + assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name())) } - } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 06efd44b5d..f196a0d8ca 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(), + anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(), + anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(), + anyBoolean(), any()) val cp = classpath(client) cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(), + anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) @@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(), + anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } @@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val clientArgs = new ClientArguments(args) val client = spy(new Client(clientArgs, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort()) + any(classOf[Path]), anyShort(), anyBoolean(), any()) client } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index a641a6e73e..784c6525e5 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone, rmClient, appAttemptId, - new SecurityManager(sparkConf)) + new SecurityManager(sparkConf), + Map()) } def createContainer(host: String): Container = { |