diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-28 12:44:46 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-28 12:44:46 -0700 |
commit | baa84e7e4c5e0afc8bc3b177379311d309c00cd2 (patch) | |
tree | 76aeeb15613a583c9472eefc6e82d3b9b582dd5c | |
parent | cd043cf922692aa493308cf1e6da6f7522d80b78 (diff) | |
parent | aac1214ee48ef143b0164f740380cdb0a5a7383b (diff) | |
download | spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.gz spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.tar.bz2 spark-baa84e7e4c5e0afc8bc3b177379311d309c00cd2.zip |
Merge pull request #865 from tgravescs/fixtmpdir
Spark on Yarn should use yarn approved directories for spark.local.dir and tmp
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 22 | ||||
-rw-r--r-- | docs/running-on-yarn.md | 4 | ||||
-rw-r--r-- | yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala | 18 | ||||
-rw-r--r-- | yarn/src/main/scala/spark/deploy/yarn/Client.scala | 4 | ||||
-rw-r--r-- | yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala | 4 |
5 files changed, 49 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 036c7191ad..fa82d2b324 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -60,6 +60,13 @@ private[spark] class Executor( System.setProperty(key, value) } + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. This will be used later when SparkEnv + // created. + if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) { + System.setProperty("spark.local.dir", getYarnLocalDirs()) + } + // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) @@ -107,6 +114,21 @@ private[spark] class Executor( threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + /** Get the Yarn approved local directories. */ + private def getYarnLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + return localDirs + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index cac9c5e4b6..1a0afd19d4 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -63,7 +63,6 @@ The command to launch the YARN Client is as follows: --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ - --user <hadoop_user> \ --queue <queue_name> For example: @@ -83,5 +82,4 @@ The above starts a YARN Client programs which periodically polls the Application - When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above. - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. -- Currently, we have not yet integrated with hadoop security. If --user is present, the hadoop_user specified will be used to run the tasks on the cluster. If unspecified, current user will be used (which should be valid in cluster). - Once hadoop security support is added, and if hadoop cluster is enabled with security, additional restrictions would apply via delegation tokens passed. +- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 15dbd1c0fb..0f3b6bc1a6 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -47,6 +47,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isFinished:Boolean = false def run() { + // setup the directories so things go to yarn approved directories rather + // then user specified and /tmp + System.setProperty("spark.local.dir", getLocalDirs()) appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() @@ -89,6 +92,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e System.exit(0) } + + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + return localDirs + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index e84fb7c985..eb2a8cc642 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -223,6 +223,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " + JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) diff --git a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala index 3e007509e6..0e1fd9b680 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -75,6 +75,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } + + JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) |