aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-21 11:55:48 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-21 11:55:48 +0530
commit199e9cf02dfaa372c1f067bca54556e1f6ce787d (patch)
tree7ea8ac8abb51f3e3cb918b7ba147cb4d909c5f99 /yarn
parent6860b79f6e4cc0d38b08848f19127c259d9b5069 (diff)
parentf6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc (diff)
downloadspark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.gz
spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.bz2
spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.zip
Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10
Conflicts: core/src/main/scala/org/apache/spark/deploy/client/Client.scala core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Diffstat (limited to 'yarn')
-rw-r--r--yarn/pom.xml50
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala55
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala258
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala25
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala228
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala83
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala220
8 files changed, 812 insertions, 111 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 7770cbb0cc..12bc97da8a 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -61,6 +61,16 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.9.3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -106,6 +116,46 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <exportAntProperties>true</exportAntProperties>
+ <tasks>
+ <property name="spark.classpath" refid="maven.test.classpath" />
+ <property environment="env" />
+ <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
+ <condition>
+ <not>
+ <or>
+ <isset property="env.SCALA_HOME" />
+ <isset property="env.SCALA_LIBRARY_PATH" />
+ </or>
+ </not>
+ </condition>
+ </fail>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <SPARK_HOME>${basedir}/..</SPARK_HOME>
+ <SPARK_TESTING>1</SPARK_TESTING>
+ <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+ </environmentVariables>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 858b58d338..4302ef4cda 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,22 +17,25 @@
package org.apache.spark.deploy.yarn
+import java.io.IOException;
import java.net.Socket
+import java.security.PrivilegedExceptionAction
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import scala.collection.JavaConversions._
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
import org.apache.hadoop.security.UserGroupInformation
-import java.security.PrivilegedExceptionAction
+import scala.collection.JavaConversions._
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
@@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var appAttemptId: ApplicationAttemptId = null
private var userThread: Thread = null
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ private val fs = FileSystem.get(yarnConf)
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
private var uiAddress: String = ""
+ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+ private var isLastAMRetry: Boolean = true
def run() {
// setup the directories so things go to yarn approved directories rather
// then user specified and /tmp
System.setProperty("spark.local.dir", getLocalDirs())
+
+ // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
+ ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
resourceManager = registerWithResourceManager()
// Workaround until hadoop moves to something which has
@@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// It need shutdown hook to set SUCCEEDED
successed = true
} finally {
+ logDebug("finishing main")
+ isLastAMRetry = true;
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
@@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
-
-
private def allocateWorkers() {
try {
logInfo("Allocating " + args.numWorkers + " workers.")
@@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
resourceManager.finishApplicationMaster(finishReq)
}
+
+ /**
+ * clean up the staging directory.
+ */
+ private def cleanupStagingDir() {
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case e: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, e)
+ }
+ }
+
+ // The shutdown hook that runs when a signal is received AND during normal
+ // close of the JVM.
+ class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+ def run() {
+ logInfo("AppMaster received a signal.")
+ // we need to clean up staging dir before HDFS is shut down
+ // make sure we don't delete it until this is the last AM
+ if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+ }
+ }
}
@@ -368,6 +413,8 @@ object ApplicationMaster {
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
// Should not really have to do this, but it helps yarn to evict resources earlier.
// not to mention, prevent Client declaring failure even though we exit'ed properly.
+ // Note that this will unfortunately not properly clean up the staging files because it gets called to
+ // late and the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 076dd3c9b0..4e0e060ddc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,26 +17,31 @@
package org.apache.spark.deploy.yarn
-import java.net.{InetSocketAddress, URI}
+import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, Records}
+
import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
import scala.collection.JavaConversions._
+
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.spark.deploy.SparkHadoopUtil
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@@ -45,8 +50,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- val credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+ private val SPARK_STAGING: String = ".sparkStaging"
+ private val distCacheMgr = new ClientDistributedCacheManager()
+
+ // staging directory is private! -> rwx--------
+ val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+ // app files are world-wide readable and owner writable -> rw-r--r--
+ val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
+
def run() {
init(yarnConf)
start()
@@ -57,8 +69,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
verifyClusterResources(newApp)
val appContext = createApplicationSubmissionContext(appId)
- val localResources = prepareLocalResources(appId, "spark")
- val env = setupLaunchEnv(localResources)
+ val appStagingDir = getAppStagingDir(appId)
+ val localResources = prepareLocalResources(appStagingDir)
+ val env = setupLaunchEnv(localResources, appStagingDir)
val amContainer = createContainerLaunchContext(newApp, localResources, env)
appContext.setQueue(args.amQueue)
@@ -70,7 +83,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
monitorApplication(appId)
System.exit(0)
}
-
+
+ def getAppStagingDir(appId: ApplicationId): String = {
+ SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ }
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
@@ -109,11 +125,74 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setApplicationName(args.appName)
return appContext
}
-
- def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = {
+
+ /*
+ * see if two file systems are the same or not.
+ */
+ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+ val srcUri = srcFs.getUri()
+ val dstUri = destFs.getUri()
+ if (srcUri.getScheme() == null) {
+ return false
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false
+ }
+ var srcHost = srcUri.getHost()
+ var dstHost = dstUri.getHost()
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ } catch {
+ case e: UnknownHostException =>
+ return false
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false
+ }
+ } else if (srcHost == null && dstHost != null) {
+ return false
+ } else if (srcHost != null && dstHost == null) {
+ return false
+ }
+ //check for ports
+ if (srcUri.getPort() != dstUri.getPort()) {
+ return false
+ }
+ return true;
+ }
+
+ /**
+ * Copy the file into HDFS if needed.
+ */
+ private def copyRemoteFile(
+ dstDir: Path,
+ originalPath: Path,
+ replication: Short,
+ setPerms: Boolean = false): Path = {
+ val fs = FileSystem.get(conf)
+ val remoteFs = originalPath.getFileSystem(conf);
+ var newPath = originalPath
+ if (! compareFs(remoteFs, fs)) {
+ newPath = new Path(dstDir, originalPath.getName())
+ logInfo("Uploading " + originalPath + " to " + newPath)
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
+ fs.setReplication(newPath, replication);
+ if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+ }
+ // resolve any symlinks in the URI path so using a "current" symlink
+ // to point to a specific version shows the specific version
+ // in the distributed cache configuration
+ val qualPath = fs.makeQualified(newPath)
+ val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+ val destPath = fc.resolvePath(qualPath)
+ destPath
+ }
+
+ def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
- val locaResources = HashMap[String, LocalResource]()
- // Upload Spark and the application JAR to the remote file system
+ // Upload Spark and the application JAR to the remote file system if necessary
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
@@ -124,67 +203,95 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(1)
}
}
+ val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+ val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val dstFs = dst.getFileSystem(conf)
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+ }
+ val localResources = HashMap[String, LocalResource]()
+ FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
+ if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
+ logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
+ System.exit(1)
+ }
+
+ Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
+ Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
- val src = new Path(localPath)
- val pathSuffix = appName + "/" + appId.getId() + destName
- val dst = new Path(fs.getHomeDirectory(), pathSuffix)
- logInfo("Uploading " + src + " to " + dst)
- fs.copyFromLocalFile(false, true, src, dst)
- val destStatus = fs.getFileStatus(dst)
-
- // get tokens for anything we upload to hdfs
- if (UserGroupInformation.isSecurityEnabled()) {
- fs.addDelegationTokens(delegTokenRenewer, credentials);
+ var localURI = new URI(localPath)
+ // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+ if (localURI.getScheme() == null) {
+ localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
}
+ val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+ val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ destName, statCache)
+ }
+ }
+
+ // handle any add jars
+ if ((args.addJars != null) && (!args.addJars.isEmpty())){
+ args.addJars.split(',').foreach { case file: String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ linkname, statCache, true)
+ }
+ }
+
+ // handle any distributed cache files
+ if ((args.files != null) && (!args.files.isEmpty())){
+ args.files.split(',').foreach { case file: String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ linkname, statCache)
+ }
+ }
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(LocalResourceType.FILE)
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst))
- amJarRsrc.setTimestamp(destStatus.getModificationTime())
- amJarRsrc.setSize(destStatus.getLen())
- locaResources(destName) = amJarRsrc
+ // handle any distributed cache archives
+ if ((args.archives != null) && (!args.archives.isEmpty())) {
+ args.archives.split(',').foreach { case file:String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+ linkname, statCache)
}
}
+
UserGroupInformation.getCurrentUser().addCredentials(credentials);
- return locaResources
+ return localResources
}
- def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
+ def setupLaunchEnv(
+ localResources: HashMap[String, LocalResource],
+ stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
- val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
+ val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
- Client.populateHadoopClasspath(yarnConf, env)
+ Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
- env("SPARK_YARN_JAR_PATH") =
- localResources("spark.jar").getResource().getScheme.toString() + "://" +
- localResources("spark.jar").getResource().getFile().toString()
- env("SPARK_YARN_JAR_TIMESTAMP") = localResources("spark.jar").getTimestamp().toString()
- env("SPARK_YARN_JAR_SIZE") = localResources("spark.jar").getSize().toString()
-
- env("SPARK_YARN_USERJAR_PATH") =
- localResources("app.jar").getResource().getScheme.toString() + "://" +
- localResources("app.jar").getResource().getFile().toString()
- env("SPARK_YARN_USERJAR_TIMESTAMP") = localResources("app.jar").getTimestamp().toString()
- env("SPARK_YARN_USERJAR_SIZE") = localResources("app.jar").getSize().toString()
-
- if (log4jConfLocalRes != null) {
- env("SPARK_YARN_LOG4J_PATH") =
- log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
- env("SPARK_YARN_LOG4J_TIMESTAMP") = log4jConfLocalRes.getTimestamp().toString()
- env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
- }
+ env("SPARK_YARN_STAGING_DIR") = stagingDir
+
+ // set the environment variables to be passed on to the Workers
+ distCacheMgr.setDistFilesEnv(env)
+ distCacheMgr.setDistArchivesEnv(env)
// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
@@ -253,6 +360,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
+ if (args.userClass == null) {
+ logError("Error: You must specify a user class!")
+ System.exit(1)
+ }
+
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@@ -320,6 +432,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
object Client {
+ val SPARK_JAR: String = "spark.jar"
+ val APP_JAR: String = "app.jar"
+ val LOG4J_PROP: String = "log4j.properties"
+
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
@@ -335,4 +451,30 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
}
+
+ def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ // If log4j present, ensure ours overrides all others
+ if (addLog4j) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + LOG4J_PROP)
+ }
+ // normally the users app.jar is last in case conflicts with spark jars
+ val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ .toBoolean
+ if (userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + APP_JAR)
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + SPARK_JAR)
+ Client.populateHadoopClasspath(conf, env)
+
+ if (!userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + APP_JAR)
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "*")
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index c56dbd99ba..852dbd7dab 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,6 +24,9 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
class ClientArguments(val args: Array[String]) {
+ var addJars: String = null
+ var files: String = null
+ var archives: String = null
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
@@ -81,6 +84,17 @@ class ClientArguments(val args: Array[String]) {
case ("--name") :: value :: tail =>
appName = value
+
+ case ("--addJars") :: value :: tail =>
+ addJars = value
+ args = tail
+
+ case ("--files") :: value :: tail =>
+ files = value
+ args = tail
+
+ case ("--archives") :: value :: tail =>
+ archives = value
args = tail
case Nil =>
@@ -97,7 +111,7 @@ class ClientArguments(val args: Array[String]) {
inputFormatInfo = inputFormatMap.values.toList
}
-
+
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
@@ -113,10 +127,13 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --name NAME The name of your application (Default: Spark)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
+ " --name NAME The name of your application (Default: Spark)\n" +
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
+ " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+ " --files files Comma separated list of files to be distributed with the job.\n" +
+ " --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
-
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000000..07686fefd7
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import org.apache.spark.Logging
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable.Map
+
+
+/** Client side methods to setup the Hadoop distributed cache */
+class ClientDistributedCacheManager() extends Logging {
+ private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
+ LinkedHashMap[String, Tuple3[String, String, String]]()
+ private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
+ LinkedHashMap[String, Tuple3[String, String, String]]()
+
+
+ /**
+ * Add a resource to the list of distributed cache resources. This list can
+ * be sent to the ApplicationMaster and possibly the workers so that it can
+ * be downloaded into the Hadoop distributed cache for use by this application.
+ * Adds the LocalResource to the localResources HashMap passed in and saves
+ * the stats of the resources to they can be sent to the workers and verified.
+ *
+ * @param fs FileSystem
+ * @param conf Configuration
+ * @param destPath path to the resource
+ * @param localResources localResource hashMap to insert the resource into
+ * @param resourceType LocalResourceType
+ * @param link link presented in the distributed cache to the destination
+ * @param statCache cache to store the file/directory stats
+ * @param appMasterOnly Whether to only add the resource to the app master
+ */
+ def addResource(
+ fs: FileSystem,
+ conf: Configuration,
+ destPath: Path,
+ localResources: HashMap[String, LocalResource],
+ resourceType: LocalResourceType,
+ link: String,
+ statCache: Map[URI, FileStatus],
+ appMasterOnly: Boolean = false) = {
+ val destStatus = fs.getFileStatus(destPath)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ amJarRsrc.setType(resourceType)
+ val visibility = getVisibility(conf, destPath.toUri(), statCache)
+ amJarRsrc.setVisibility(visibility)
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+ amJarRsrc.setTimestamp(destStatus.getModificationTime())
+ amJarRsrc.setSize(destStatus.getLen())
+ if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+ localResources(link) = amJarRsrc
+
+ if (appMasterOnly == false) {
+ val uri = destPath.toUri()
+ val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
+ if (resourceType == LocalResourceType.FILE) {
+ distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
+ destStatus.getModificationTime().toString(), visibility.name())
+ } else {
+ distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
+ destStatus.getModificationTime().toString(), visibility.name())
+ }
+ }
+ }
+
+ /**
+ * Adds the necessary cache file env variables to the env passed in
+ * @param env
+ */
+ def setDistFilesEnv(env: Map[String, String]) = {
+ val (keys, tupleValues) = distCacheFiles.unzip
+ val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+ if (keys.size > 0) {
+ env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
+ timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
+ sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
+ visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+ }
+ }
+
+ /**
+ * Adds the necessary cache archive env variables to the env passed in
+ * @param env
+ */
+ def setDistArchivesEnv(env: Map[String, String]) = {
+ val (keys, tupleValues) = distCacheArchives.unzip
+ val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+ if (keys.size > 0) {
+ env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
+ timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
+ sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+ env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
+ visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+ }
+ }
+
+ /**
+ * Returns the local resource visibility depending on the cache file permissions
+ * @param conf
+ * @param uri
+ * @param statCache
+ * @return LocalResourceVisibility
+ */
+ def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+ LocalResourceVisibility = {
+ if (isPublic(conf, uri, statCache)) {
+ return LocalResourceVisibility.PUBLIC
+ }
+ return LocalResourceVisibility.PRIVATE
+ }
+
+ /**
+ * Returns a boolean to denote whether a cache file is visible to all(public)
+ * or not
+ * @param conf
+ * @param uri
+ * @param statCache
+ * @return true if the path in the uri is visible to all, false otherwise
+ */
+ def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+ val fs = FileSystem.get(uri, conf)
+ val current = new Path(uri.getPath())
+ //the leaf level file should be readable by others
+ if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+ return false
+ }
+ return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+ }
+
+ /**
+ * Returns true if all ancestors of the specified path have the 'execute'
+ * permission set for all users (i.e. that other users can traverse
+ * the directory heirarchy to the given path)
+ * @param fs
+ * @param path
+ * @param statCache
+ * @return true if all ancestors have the 'execute' permission set for all users
+ */
+ def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
+ statCache: Map[URI, FileStatus]): Boolean = {
+ var current = path
+ while (current != null) {
+ //the subdirs in the path should have execute permissions for others
+ if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+ return false
+ }
+ current = current.getParent()
+ }
+ return true
+ }
+
+ /**
+ * Checks for a given path whether the Other permissions on it
+ * imply the permission in the passed FsAction
+ * @param fs
+ * @param path
+ * @param action
+ * @param statCache
+ * @return true if the path in the uri is visible to all, false otherwise
+ */
+ def checkPermissionOfOther(fs: FileSystem, path: Path,
+ action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+ val status = getFileStatus(fs, path.toUri(), statCache);
+ val perms = status.getPermission()
+ val otherAction = perms.getOtherAction()
+ if (otherAction.implies(action)) {
+ return true;
+ }
+ return false
+ }
+
+ /**
+ * Checks to see if the given uri exists in the cache, if it does it
+ * returns the existing FileStatus, otherwise it stats the uri, stores
+ * it in the cache, and returns the FileStatus.
+ * @param fs
+ * @param uri
+ * @param statCache
+ * @return FileStatus
+ */
+ def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+ val stat = statCache.get(uri) match {
+ case Some(existstat) => existstat
+ case None =>
+ val newStat = fs.getFileStatus(new Path(uri))
+ statCache.put(uri, newStat)
+ newStat
+ }
+ return stat
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index a60e8a3007..7a66532254 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
- " org.apache.spark.executor.StandaloneExecutorBackend " +
+ " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " +
slaveId + " " +
hostname + " " +
@@ -137,61 +137,58 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
startReq.setContainerLaunchContext(ctx)
cm.startContainer(startReq)
}
+
+ private def setupDistributedCache(file: String,
+ rtype: LocalResourceType,
+ localResources: HashMap[String, LocalResource],
+ timestamp: String,
+ size: String,
+ vis: String) = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+ localResources(uri.getFragment()) = amJarRsrc
+ }
def prepareLocalResources: HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
- val locaResources = HashMap[String, LocalResource]()
+ val localResources = HashMap[String, LocalResource]()
- // Spark JAR
- val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- sparkJarResource.setType(LocalResourceType.FILE)
- sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
- sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
- sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
- sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
- locaResources("spark.jar") = sparkJarResource
- // User JAR
- val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- userJarResource.setType(LocalResourceType.FILE)
- userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
- userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
- userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
- userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
- locaResources("app.jar") = userJarResource
-
- // Log4j conf - if available
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- log4jConfResource.setType(LocalResourceType.FILE)
- log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
- log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
- log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
- log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
- locaResources("log4j.properties") = log4jConfResource
+ if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+ val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+ for( i <- 0 to distFiles.length - 1) {
+ setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+ fileSizes(i), visibilities(i))
+ }
}
+ if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+ val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+ for( i <- 0 to distArchives.length - 1) {
+ setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
+ timeStamps(i), fileSizes(i), visibilities(i))
+ }
+ }
- logInfo("Prepared Local resources " + locaResources)
- return locaResources
+ logInfo("Prepared Local resources " + localResources)
+ return localResources
}
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- // Which is correct ?
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
- }
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
- Client.populateHadoopClasspath(yarnConf, env)
+ Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index d222f412a0..4beb5229fe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -22,7 +22,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
import scala.collection
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.util.{RackResolver, Records}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
@@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
// just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
new file mode 100644
index 0000000000..c0a2af0c6f
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito.when
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+
+class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
+
+ class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
+ override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+ LocalResourceVisibility = {
+ return LocalResourceVisibility.PRIVATE
+ }
+ }
+
+ test("test getFileStatus empty") {
+ val distMgr = new ClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val uri = new URI("/tmp/testing")
+ when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val stat = distMgr.getFileStatus(fs, uri, statCache)
+ assert(stat.getPath() === null)
+ }
+
+ test("test getFileStatus cached") {
+ val distMgr = new ClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val uri = new URI("/tmp/testing")
+ val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
+ val stat = distMgr.getFileStatus(fs, uri, statCache)
+ assert(stat.getPath().toString() === "/tmp/testing")
+ }
+
+ test("test addResource") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
+ statCache, false)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 0)
+ assert(resource.getSize() === 0)
+ assert(resource.getType() === LocalResourceType.FILE)
+
+ val env = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env)
+ assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
+ assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
+ assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+
+ //add another one and verify both there and order correct
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing2"))
+ val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
+ when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
+ distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
+ statCache, false)
+ val resource2 = localResources("link2")
+ assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
+ assert(resource2.getTimestamp() === 10)
+ assert(resource2.getSize() === 20)
+ assert(resource2.getType() === LocalResourceType.FILE)
+
+ val env2 = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env2)
+ val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+ val files = env2("SPARK_YARN_CACHE_FILES").split(',')
+ val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+ val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+ assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(timestamps(0) === "0")
+ assert(sizes(0) === "0")
+ assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
+
+ assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
+ assert(timestamps(1) === "10")
+ assert(sizes(1) === "20")
+ assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
+ }
+
+ test("test addResource link null") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+ intercept[Exception] {
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
+ statCache, false)
+ }
+ assert(localResources.get("link") === None)
+ assert(localResources.size === 0)
+ }
+
+ test("test addResource appmaster only") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ statCache, true)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 10)
+ assert(resource.getSize() === 20)
+ assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+ val env = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ }
+
+ test("test addResource archive") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ statCache, false)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 10)
+ assert(resource.getSize() === 20)
+ assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+ val env = new HashMap[String, String]()
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+ distMgr.setDistFilesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+ }
+
+
+}