diff options
author | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-08-27 11:00:21 -0500 |
---|---|---|
committer | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-08-27 11:00:21 -0500 |
commit | cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb (patch) | |
tree | 5f403e38ba1a40941caa6d4c6e9fefb604b9c88a | |
parent | 6dd64e8bb2256b56e0908c628ebdb3b533adf432 (diff) | |
download | spark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.tar.gz spark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.tar.bz2 spark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.zip |
Allow for Executors to have different directories then the Spark Master for Yarn
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 036c7191ad..9e0356a711 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -60,6 +60,13 @@ private[spark] class Executor( System.setProperty(key, value) } + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. This will be used later when SparkEnv + // created. + if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) { + System.setProperty("spark.local.dir", getYarnLocalDirs()) + } + // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) @@ -107,6 +114,24 @@ private[spark] class Executor( threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + /** Get the Yarn approved local directories. */ + private def getYarnLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + var localDirs = System.getenv("LOCAL_DIRS") + val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + yarnLocalSysDirs match { + case Some(s) => localDirs = s + case None => { + if ((localDirs == null) || (localDirs.isEmpty())) { + throw new Exception("Yarn Local dirs can't be empty") + } + } + } + return localDirs + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { |