aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191ad..9e0356a711 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
System.setProperty(key, value)
}
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available. This will be used later when SparkEnv
+ // created.
+ if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
+ System.setProperty("spark.local.dir", getYarnLocalDirs())
+ }
+
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,24 @@ private[spark] class Executor(
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ var localDirs = System.getenv("LOCAL_DIRS")
+ val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ yarnLocalSysDirs match {
+ case Some(s) => localDirs = s
+ case None => {
+ if ((localDirs == null) || (localDirs.isEmpty())) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ }
+ }
+ return localDirs
+ }
+
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {