diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-28 12:44:46 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-28 12:44:46 -0700 |
commit | baa84e7e4c5e0afc8bc3b177379311d309c00cd2 (patch) | |
tree | 76aeeb15613a583c9472eefc6e82d3b9b582dd5c /core | |
parent | cd043cf922692aa493308cf1e6da6f7522d80b78 (diff) | |
parent | aac1214ee48ef143b0164f740380cdb0a5a7383b (diff) | |
download | spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.gz spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.bz2 spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.zip |
Merge pull request #865 from tgravescs/fixtmpdir
Spark on Yarn should use yarn approved directories for spark.local.dir and tmp
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 036c7191ad..fa82d2b324 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -60,6 +60,13 @@ private[spark] class Executor( System.setProperty(key, value) } + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. This will be used later when SparkEnv + // created. + if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) { + System.setProperty("spark.local.dir", getYarnLocalDirs()) + } + // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) @@ -107,6 +114,21 @@ private[spark] class Executor( threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + /** Get the Yarn approved local directories. */ + private def getYarnLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + return localDirs + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { |