aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala8
4 files changed, 32 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bea04cd542..6653aca0a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -135,7 +135,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
- builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
+ builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 10929eb516..2473a90aa9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -345,11 +345,11 @@ private[spark] class Worker(
}
// Create local dirs for the executor. These are passed to the executor via the
- // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
+ // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
- Utils.createDirectory(dir).getAbsolutePath()
+ Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1396f167eb..4644088f19 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -63,6 +63,7 @@ private[spark] object Utils extends Logging {
val random = new Random()
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+ @volatile private var localRootDirs: Array[String] = null
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
@@ -683,14 +684,31 @@ private[spark] object Utils extends Logging {
* and returns only the directories that exist / could be created.
*
* If no directories could be created, this will return an empty list.
+ *
+ * This method will cache the local directories for the application when it's first invoked.
+ * So calling it multiple times with a different configuration will always return the same
+ * set of directories.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
+ if (localRootDirs == null) {
+ this.synchronized {
+ if (localRootDirs == null) {
+ localRootDirs = getOrCreateLocalRootDirsImpl(conf)
+ }
+ }
+ }
+ localRootDirs
+ }
+
+ private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
+ } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
+ conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else {
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
@@ -734,6 +752,11 @@ private[spark] object Utils extends Logging {
localDirs
}
+ /** Used by unit tests. Do not call from other places. */
+ private[spark] def clearLocalRootDirs(): Unit = {
+ localRootDirs = null
+ }
+
/**
* Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 8cf951adb3..82a82e23ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.io.File
import org.apache.spark.util.Utils
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkConf
@@ -28,7 +28,11 @@ import org.apache.spark.SparkConf
/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
-class LocalDirsSuite extends FunSuite {
+class LocalDirsSuite extends FunSuite with BeforeAndAfter {
+
+ before {
+ Utils.clearLocalRootDirs()
+ }
test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
// Regression test for SPARK-2974