aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala67
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala61
-rw-r--r--python/pyspark/shuffle.py2
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala18
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala19
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala18
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala19
14 files changed, 145 insertions, 118 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b4f321ec99..605df0e929 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
- private val settings = new HashMap[String, String]()
+ private[spark] val settings = new HashMap[String, String]()
if (loadDefaults) {
// Load any spark.* system properties
@@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}
+ /**
+ * By using this instead of System.getenv(), environment variables can be mocked
+ * in unit tests.
+ */
+ private[spark] def getenv(name: String): String = System.getenv(name)
+
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 10210a2927..747023812f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -62,7 +62,7 @@ private[spark] class PythonRDD(
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
- envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
+ envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Start a thread to feed the process input from our parent's iterator
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fb3f7bd54b..2f76e532ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -62,16 +62,6 @@ private[spark] class Executor(
val conf = new SparkConf(true)
conf.setAll(properties)
- // If we are in yarn mode, systems can have different disk layouts so we must set it
- // to what Yarn on this system said was available. This will be used later when SparkEnv
- // created.
- if (java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
- conf.set("spark.local.dir", getYarnLocalDirs())
- } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
- conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
- }
-
if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
@@ -134,21 +124,6 @@ private[spark] class Executor(
threadPool.shutdown()
}
- /** Get the Yarn approved local directories. */
- private def getYarnLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
-
- if (localDirs.isEmpty) {
- throw new Exception("Yarn Local dirs can't be empty")
- }
- localDirs
- }
-
class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c0491fb55e..12a92d44f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -67,8 +67,7 @@ private[spark] class BlockManager(
private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index f3da816389..ec022ce9c0 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}
-import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.PathResolver
import org.apache.spark.util.Utils
@@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
+ * Block files are hashed among the directories listed in spark.local.dir (or in
+ * SPARK_LOCAL_DIRS, if it's set).
*/
-private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
+private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
- val localDirs: Array[File] = createLocalDirs()
+ val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -130,10 +131,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
(blockId, getFile(blockId))
}
- private def createLocalDirs(): Array[File] = {
- logDebug(s"Creating local directories at root dirs '$rootDirs'")
+ private def createLocalDirs(conf: SparkConf): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
- rootDirs.split(",").flatMap { rootDir =>
+ Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 019f68b160..d6d74ce269 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -449,12 +449,71 @@ private[spark] object Utils extends Logging {
}
/**
- * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
- * return a single directory, even though the spark.local.dir property might be a list of
- * multiple paths.
+ * Get the path of a temporary directory. Spark's local directories can be configured through
+ * multiple settings, which are used with the following precedence:
+ *
+ * - If called from inside of a YARN container, this will return a directory chosen by YARN.
+ * - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
+ * - Otherwise, if the spark.local.dir is set, this will return a directory from it.
+ * - Otherwise, this will return java.io.tmpdir.
+ *
+ * Some of these configuration options might be lists of multiple paths, but this method will
+ * always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ getOrCreateLocalRootDirs(conf)(0)
+ }
+
+ private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
+ // These environment variables are set by YARN.
+ // For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
+ // For Hadoop 2.X, we check for CONTAINER_ID.
+ conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
+ }
+
+ /**
+ * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
+ * and returns only the directories that exist / could be created.
+ *
+ * If no directories could be created, this will return an empty list.
+ */
+ private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
+ val confValue = if (isRunningInYarnContainer(conf)) {
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available.
+ getYarnLocalDirs(conf)
+ } else {
+ Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ }
+ val rootDirs = confValue.split(',')
+ logDebug(s"Getting/creating local root dirs at '$confValue'")
+
+ rootDirs.flatMap { rootDir =>
+ val localDir: File = new File(rootDir)
+ val foundLocalDir = localDir.exists || localDir.mkdirs()
+ if (!foundLocalDir) {
+ logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
+ None
+ } else {
+ Some(rootDir)
+ }
+ }
+ }
+
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(conf: SparkConf): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(conf.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ localDirs
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 20bac66105..f32ce6f9fc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- System.getProperty("java.io.tmpdir"))
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 777579bc57..aabaeadd7a 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
}
override def beforeEach() {
- diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
+ val conf = testConf.clone
+ conf.set("spark.local.dir", rootDirs)
+ diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
shuffleBlockManager.idToSegmentMap.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
new file mode 100644
index 0000000000..dae7bf0e33
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+
+
+/**
+ * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
+ */
+class LocalDirsSuite extends FunSuite {
+
+ test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
+ // Regression test for SPARK-2974
+ assert(!new File("/NONEXISTENT_DIR").exists())
+ val conf = new SparkConf(false)
+ .set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
+ assert(new File(Utils.getLocalDir(conf)).exists())
+ }
+
+ test("SPARK_LOCAL_DIRS override also affects driver") {
+ // Regression test for SPARK-2975
+ assert(!new File("/NONEXISTENT_DIR").exists())
+ // SPARK_LOCAL_DIRS is a valid directory:
+ class MySparkConf extends SparkConf(false) {
+ override def getenv(name: String) = {
+ if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
+ else super.getenv(name)
+ }
+
+ override def clone: SparkConf = {
+ new MySparkConf().setAll(settings)
+ }
+ }
+ // spark.local.dir only contains invalid directories, but that's not a problem since
+ // SPARK_LOCAL_DIRS will override it on both the driver and workers:
+ val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
+ assert(new File(Utils.getLocalDir(conf)).exists())
+ }
+
+}
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 2c68cd4921..1ebe7df418 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -214,7 +214,7 @@ class ExternalMerger(Merger):
def _get_dirs(self):
""" Get all the directories """
- path = os.environ.get("SPARK_LOCAL_DIR", "/tmp")
+ path = os.environ.get("SPARK_LOCAL_DIRS", "/tmp")
dirs = path.split(",")
return [os.path.join(d, "python", str(os.getpid()), str(id(self)))
for d in dirs]
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 46a01f5a9a..4d4848b1bd 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
def run() {
- // Setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
-
// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
@@ -138,20 +134,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
params)
}
- /** Get the Yarn approved local directories. */
- private def getLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .orElse(Option(System.getenv("LOCAL_DIRS")))
-
- localDirs match {
- case None => throw new Exception("Yarn Local dirs can't be empty")
- case Some(l) => l
- }
- }
-
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 72c7143edc..c3310fbc24 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -95,11 +95,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
def run() {
-
- // Setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
-
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
@@ -152,20 +147,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
System.exit(0)
}
- /** Get the Yarn approved local directories. */
- private def getLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .orElse(Option(System.getenv("LOCAL_DIRS")))
-
- localDirs match {
- case None => throw new Exception("Yarn Local dirs can't be empty")
- case Some(l) => l
- }
- }
-
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9c2bcf17a8..1c4005fd8e 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
def run() {
- // Setup the directories so things go to YARN approved directories rather
- // than user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
-
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
@@ -144,20 +140,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}
- // Get the Yarn approved local directories.
- private def getLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .orElse(Option(System.getenv("LOCAL_DIRS")))
-
- localDirs match {
- case None => throw new Exception("Yarn local dirs can't be empty")
- case Some(l) => l
- }
- }
-
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a7585748b7..45925f1fea 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -94,11 +94,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
def run() {
-
- // Setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
-
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()
@@ -141,20 +136,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
System.exit(0)
}
- /** Get the Yarn approved local directories. */
- private def getLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .orElse(Option(System.getenv("LOCAL_DIRS")))
-
- localDirs match {
- case None => throw new Exception("Yarn Local dirs can't be empty")
- case Some(l) => l
- }
- }
-
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")