diff options
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 49 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 12 |
2 files changed, 25 insertions, 36 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7c168ed279..ae5fb6bbd4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -117,6 +117,11 @@ private[spark] class Client( private var appId: ApplicationId = null + // The app staging dir based on the STAGING_DIR configuration if configured + // otherwise based on the users home directory. + private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) + def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) } @@ -179,18 +184,17 @@ private[spark] class Client( * Cleanup application staging directory. */ private def cleanupStagingDir(appId: ApplicationId): Unit = { - val appStagingDir = getAppStagingDir(appId) + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val fs = FileSystem.get(hadoopConf) - val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir) + val fs = stagingDirPath.getFileSystem(hadoopConf) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) } } catch { case ioe: IOException => - logWarning("Failed to cleanup staging dir " + appStagingDir, ioe) + logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) } } @@ -351,14 +355,13 @@ private[spark] class Client( * Exposed for testing. */ def prepareLocalResources( - appStagingDir: String, + destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource] = { logInfo("Preparing resources for our AM container") // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. - val fs = FileSystem.get(hadoopConf) - val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir) - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst + val fs = destDir.getFileSystem(hadoopConf) + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added // multiple times, YARN will fail to launch containers for the app with an internal @@ -372,9 +375,9 @@ private[spark] class Client( YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) - .getOrElse(fs.getDefaultReplication(dst)) + .getOrElse(fs.getDefaultReplication(destDir)) val localResources = HashMap[String, LocalResource]() - FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) + FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION)) val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() @@ -422,7 +425,7 @@ private[spark] class Client( val localPath = getQualifiedLocalPath(localURI, hadoopConf) val linkname = targetDir.map(_ + "/").getOrElse("") + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) + val destPath = copyFileToRemote(destDir, localPath, replication) val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( destFs, hadoopConf, destPath, localResources, resType, linkname, statCache, @@ -666,17 +669,15 @@ private[spark] class Client( * Set up the environment for launching our ApplicationMaster container. */ private def setupLaunchEnv( - stagingDir: String, + stagingDirPath: Path, pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_MODE") = "true" - env("SPARK_YARN_STAGING_DIR") = stagingDir + env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { - val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -776,15 +777,15 @@ private[spark] class Client( : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId - val appStagingDir = getAppStagingDir(appId) + val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) val pySparkArchives = if (sparkConf.get(IS_PYTHON_APP)) { findPySparkArchives() } else { Nil } - val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives) - val localResources = prepareLocalResources(appStagingDir, pySparkArchives) + val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) + val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(launchEnv) @@ -1446,16 +1447,4 @@ private object Client extends Logging { uri.startsWith(s"$LOCAL_SCHEME:") } - /** - * Returns the app staging dir based on the STAGING_DIR configuration if configured - * otherwise based on the users home directory. - */ - private def getAppStagingDirPath( - conf: SparkConf, - fs: FileSystem, - appStagingDir: String): Path = { - val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory()) - new Path(baseDir, appStagingDir) - } - } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 23050e8c1d..06efd44b5d 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -147,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val tempDir = Utils.createTempDir() try { - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) sparkConf.get(APP_JAR) should be (Some(USER)) // The non-local path should be propagated by name only, since it will end up in the app's @@ -238,7 +238,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) val tempDir = Utils.createTempDir() - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) @@ -260,14 +260,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath()) val client = createClient(sparkConf) - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) intercept[IllegalArgumentException] { - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) } } @@ -280,7 +280,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } @@ -308,7 +308,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) val tempDir = Utils.createTempDir() - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be // ignored. |