From cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb Mon Sep 17 00:00:00 2001 From: "Y.CORP.YAHOO.COM\\tgraves" Date: Tue, 27 Aug 2013 11:00:21 -0500 Subject: Allow for Executors to have different directories then the Spark Master for Yarn --- core/src/main/scala/spark/executor/Executor.scala | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 036c7191ad..9e0356a711 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -60,6 +60,13 @@ private[spark] class Executor( System.setProperty(key, value) } + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. This will be used later when SparkEnv + // created. + if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) { + System.setProperty("spark.local.dir", getYarnLocalDirs()) + } + // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) @@ -107,6 +114,24 @@ private[spark] class Executor( threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + /** Get the Yarn approved local directories. */ + private def getYarnLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + var localDirs = System.getenv("LOCAL_DIRS") + val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + yarnLocalSysDirs match { + case Some(s) => localDirs = s + case None => { + if ((localDirs == null) || (localDirs.isEmpty())) { + throw new Exception("Yarn Local dirs can't be empty") + } + } + } + return localDirs + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { -- cgit v1.2.3