aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-28 12:44:46 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-28 12:44:46 -0700
commitbaa84e7e4c5e0afc8bc3b177379311d309c00cd2 (patch)
tree76aeeb15613a583c9472eefc6e82d3b9b582dd5c /core
parentcd043cf922692aa493308cf1e6da6f7522d80b78 (diff)
parentaac1214ee48ef143b0164f740380cdb0a5a7383b (diff)
downloadspark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.gz
spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.bz2
spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.zip
Merge pull request #865 from tgravescs/fixtmpdir
Spark on Yarn should use yarn approved directories for spark.local.dir and tmp
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala22
1 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191ad..fa82d2b324 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
System.setProperty(key, value)
}
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available. This will be used later when SparkEnv
+ // created.
+ if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
+ System.setProperty("spark.local.dir", getYarnLocalDirs())
+ }
+
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,21 @@ private[spark] class Executor(
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ return localDirs
+ }
+
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {