aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-27 11:00:21 -0500
committerY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-27 11:00:21 -0500
commitcf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb (patch)
tree5f403e38ba1a40941caa6d4c6e9fefb604b9c88a /core
parent6dd64e8bb2256b56e0908c628ebdb3b533adf432 (diff)
downloadspark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.tar.gz
spark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.tar.bz2
spark-cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb.zip
Allow for Executors to have different directories then the Spark Master for Yarn
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191ad..9e0356a711 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
System.setProperty(key, value)
}
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available. This will be used later when SparkEnv
+ // created.
+ if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
+ System.setProperty("spark.local.dir", getYarnLocalDirs())
+ }
+
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,24 @@ private[spark] class Executor(
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ var localDirs = System.getenv("LOCAL_DIRS")
+ val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ yarnLocalSysDirs match {
+ case Some(s) => localDirs = s
+ case None => {
+ if ((localDirs == null) || (localDirs.isEmpty())) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ }
+ }
+ return localDirs
+ }
+
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {