aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-26 17:35:03 +0000
committerSean Owen <sowen@cloudera.com>2015-02-26 17:35:03 +0000
commitdf3d559b32f1ceb8ca3491e2a1169c56a6faab58 (patch)
treee24a03547edfe1c1bf188b9fa00c0a4f9ad2d724 /core
parent192e42a2933eb283e12bfdfb46e2ef895228af4a (diff)
downloadspark-df3d559b32f1ceb8ca3491e2a1169c56a6faab58.tar.gz
spark-df3d559b32f1ceb8ca3491e2a1169c56a6faab58.tar.bz2
spark-df3d559b32f1ceb8ca3491e2a1169c56a6faab58.zip
[SPARK-5801] [core] Avoid creating nested directories.
Cache the value of the local root dirs to use for storing local data, so that the same directories are reused. Also, to avoid an extra level of nesting, use a different env variable to propagate the local dirs from the Worker to the executors. And make the executor directory use a different name. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4747 from vanzin/SPARK-5801 and squashes the following commits: e0114e1 [Marcelo Vanzin] Update unit test. 18ee0a7 [Marcelo Vanzin] [SPARK-5801] [core] Avoid creating nested directories.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala8
4 files changed, 32 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bea04cd542..6653aca0a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -135,7 +135,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
- builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
+ builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 10929eb516..2473a90aa9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -345,11 +345,11 @@ private[spark] class Worker(
}
// Create local dirs for the executor. These are passed to the executor via the
- // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
+ // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
- Utils.createDirectory(dir).getAbsolutePath()
+ Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1396f167eb..4644088f19 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -63,6 +63,7 @@ private[spark] object Utils extends Logging {
val random = new Random()
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+ @volatile private var localRootDirs: Array[String] = null
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
@@ -683,14 +684,31 @@ private[spark] object Utils extends Logging {
* and returns only the directories that exist / could be created.
*
* If no directories could be created, this will return an empty list.
+ *
+ * This method will cache the local directories for the application when it's first invoked.
+ * So calling it multiple times with a different configuration will always return the same
+ * set of directories.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
+ if (localRootDirs == null) {
+ this.synchronized {
+ if (localRootDirs == null) {
+ localRootDirs = getOrCreateLocalRootDirsImpl(conf)
+ }
+ }
+ }
+ localRootDirs
+ }
+
+ private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
+ } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
+ conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else {
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
@@ -734,6 +752,11 @@ private[spark] object Utils extends Logging {
localDirs
}
+ /** Used by unit tests. Do not call from other places. */
+ private[spark] def clearLocalRootDirs(): Unit = {
+ localRootDirs = null
+ }
+
/**
* Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 8cf951adb3..82a82e23ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.io.File
import org.apache.spark.util.Utils
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkConf
@@ -28,7 +28,11 @@ import org.apache.spark.SparkConf
/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
-class LocalDirsSuite extends FunSuite {
+class LocalDirsSuite extends FunSuite with BeforeAndAfter {
+
+ before {
+ Utils.clearLocalRootDirs()
+ }
test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
// Regression test for SPARK-2974