From df3d559b32f1ceb8ca3491e2a1169c56a6faab58 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 26 Feb 2015 17:35:03 +0000 Subject: [SPARK-5801] [core] Avoid creating nested directories. Cache the value of the local root dirs to use for storing local data, so that the same directories are reused. Also, to avoid an extra level of nesting, use a different env variable to propagate the local dirs from the Worker to the executors. And make the executor directory use a different name. Author: Marcelo Vanzin Closes #4747 from vanzin/SPARK-5801 and squashes the following commits: e0114e1 [Marcelo Vanzin] Update unit test. 18ee0a7 [Marcelo Vanzin] [SPARK-5801] [core] Avoid creating nested directories. --- .../spark/deploy/worker/ExecutorRunner.scala | 2 +- .../org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../main/scala/org/apache/spark/util/Utils.scala | 23 ++++++++++++++++++++++ .../org/apache/spark/storage/LocalDirsSuite.scala | 8 ++++++-- 4 files changed, 32 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index bea04cd542..6653aca0a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -135,7 +135,7 @@ private[spark] class ExecutorRunner( logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) builder.directory(executorDir) - builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(",")) + builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 10929eb516..2473a90aa9 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -345,11 +345,11 @@ private[spark] class Worker( } // Create local dirs for the executor. These are passed to the executor via the - // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the + // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.get(appId).getOrElse { Utils.getOrCreateLocalRootDirs(conf).map { dir => - Utils.createDirectory(dir).getAbsolutePath() + Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath() }.toSeq } appDirectories(appId) = appLocalDirs diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1396f167eb..4644088f19 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -63,6 +63,7 @@ private[spark] object Utils extends Logging { val random = new Random() private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + @volatile private var localRootDirs: Array[String] = null /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { @@ -683,14 +684,31 @@ private[spark] object Utils extends Logging { * and returns only the directories that exist / could be created. * * If no directories could be created, this will return an empty list. + * + * This method will cache the local directories for the application when it's first invoked. + * So calling it multiple times with a different configuration will always return the same + * set of directories. */ private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + if (localRootDirs == null) { + this.synchronized { + if (localRootDirs == null) { + localRootDirs = getOrCreateLocalRootDirsImpl(conf) + } + } + } + localRootDirs + } + + private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has // created the directories already, and that they are secured so that only the // user has access to them. getYarnLocalDirs(conf).split(",") + } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) { + conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator) } else { // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user // configuration to point to a secure directory. So create a subdirectory with restricted @@ -734,6 +752,11 @@ private[spark] object Utils extends Logging { localDirs } + /** Used by unit tests. Do not call from other places. */ + private[spark] def clearLocalRootDirs(): Unit = { + localRootDirs = null + } + /** * Shuffle the elements of a collection into a random order, returning the * result in a new collection. Unlike scala.util.Random.shuffle, this method diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala index 8cf951adb3..82a82e23ee 100644 --- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io.File import org.apache.spark.util.Utils -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SparkConf @@ -28,7 +28,11 @@ import org.apache.spark.SparkConf /** * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options. */ -class LocalDirsSuite extends FunSuite { +class LocalDirsSuite extends FunSuite with BeforeAndAfter { + + before { + Utils.clearLocalRootDirs() + } test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") { // Regression test for SPARK-2974 -- cgit v1.2.3