aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-08-19 22:42:50 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-19 22:42:50 -0700
commitebcb94f701273b56851dade677e047388a8bca09 (patch)
tree7cb860a5398134f94d80c21a4a8967213b644f7d /core
parent0a984aa155fb7f532fe87620dcf1a2814c5b8b49 (diff)
downloadspark-ebcb94f701273b56851dade677e047388a8bca09.tar.gz
spark-ebcb94f701273b56851dade677e047388a8bca09.tar.bz2
spark-ebcb94f701273b56851dade677e047388a8bca09.zip
[SPARK-2974] [SPARK-2975] Fix two bugs related to spark.local.dirs
This PR fixes two bugs related to `spark.local.dirs` and `SPARK_LOCAL_DIRS`, one where `Utils.getLocalDir()` might return an invalid directory (SPARK-2974) and another where the `SPARK_LOCAL_DIRS` override didn't affect the driver, which could cause problems when running tasks in local mode (SPARK-2975). This patch fixes both issues: the new `Utils.getOrCreateLocalRootDirs(conf: SparkConf)` utility method manages the creation of local directories and handles the precedence among the different configuration options, so we should see the same behavior whether we're running in local mode or on a worker. It's kind of a pain to mock out environment variables in tests (no easy way to mock System.getenv), so I added a `private[spark]` method to SparkConf for accessing environment variables (by default, it just delegates to System.getenv). By subclassing SparkConf and overriding this method, we can mock out SPARK_LOCAL_DIRS in tests. I also fixed a typo in PySpark where we used `SPARK_LOCAL_DIR` instead of `SPARK_LOCAL_DIRS` (I think this was technically innocuous, but it seemed worth fixing). Author: Josh Rosen <joshrosen@apache.org> Closes #2002 from JoshRosen/local-dirs and squashes the following commits: efad8c6 [Josh Rosen] Address review comments: 1dec709 [Josh Rosen] Minor updates to Javadocs. 7f36999 [Josh Rosen] Use env vars to detect if running in YARN container. 399ac25 [Josh Rosen] Update getLocalDir() documentation. bb3ad89 [Josh Rosen] Remove duplicated YARN getLocalDirs() code. 3e92d44 [Josh Rosen] Move local dirs override logic into Utils; fix bugs: b2c4736 [Josh Rosen] Add failing tests for SPARK-2974 and SPARK-2975. 007298b [Josh Rosen] Allow environment variables to be mocked in tests. 6d9259b [Josh Rosen] Fix typo in PySpark: SPARK_LOCAL_DIR should be SPARK_LOCAL_DIRS
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala67
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala61
9 files changed, 144 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b4f321ec99..605df0e929 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
- private val settings = new HashMap[String, String]()
+ private[spark] val settings = new HashMap[String, String]()
if (loadDefaults) {
// Load any spark.* system properties
@@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}
+ /**
+ * By using this instead of System.getenv(), environment variables can be mocked
+ * in unit tests.
+ */
+ private[spark] def getenv(name: String): String = System.getenv(name)
+
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 10210a2927..747023812f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -62,7 +62,7 @@ private[spark] class PythonRDD(
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
- envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
+ envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Start a thread to feed the process input from our parent's iterator
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fb3f7bd54b..2f76e532ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -62,16 +62,6 @@ private[spark] class Executor(
val conf = new SparkConf(true)
conf.setAll(properties)
- // If we are in yarn mode, systems can have different disk layouts so we must set it
- // to what Yarn on this system said was available. This will be used later when SparkEnv
- // created.
- if (java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
- conf.set("spark.local.dir", getYarnLocalDirs())
- } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
- conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
- }
-
if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
@@ -134,21 +124,6 @@ private[spark] class Executor(
threadPool.shutdown()
}
- /** Get the Yarn approved local directories. */
- private def getYarnLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
-
- if (localDirs.isEmpty) {
- throw new Exception("Yarn Local dirs can't be empty")
- }
- localDirs
- }
-
class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c0491fb55e..12a92d44f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -67,8 +67,7 @@ private[spark] class BlockManager(
private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index f3da816389..ec022ce9c0 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}
-import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.PathResolver
import org.apache.spark.util.Utils
@@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
+ * Block files are hashed among the directories listed in spark.local.dir (or in
+ * SPARK_LOCAL_DIRS, if it's set).
*/
-private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
+private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
- val localDirs: Array[File] = createLocalDirs()
+ val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -130,10 +131,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
(blockId, getFile(blockId))
}
- private def createLocalDirs(): Array[File] = {
- logDebug(s"Creating local directories at root dirs '$rootDirs'")
+ private def createLocalDirs(conf: SparkConf): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
- rootDirs.split(",").flatMap { rootDir =>
+ Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 019f68b160..d6d74ce269 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -449,12 +449,71 @@ private[spark] object Utils extends Logging {
}
/**
- * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
- * return a single directory, even though the spark.local.dir property might be a list of
- * multiple paths.
+ * Get the path of a temporary directory. Spark's local directories can be configured through
+ * multiple settings, which are used with the following precedence:
+ *
+ * - If called from inside of a YARN container, this will return a directory chosen by YARN.
+ * - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
+ * - Otherwise, if the spark.local.dir is set, this will return a directory from it.
+ * - Otherwise, this will return java.io.tmpdir.
+ *
+ * Some of these configuration options might be lists of multiple paths, but this method will
+ * always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ getOrCreateLocalRootDirs(conf)(0)
+ }
+
+ private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
+ // These environment variables are set by YARN.
+ // For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
+ // For Hadoop 2.X, we check for CONTAINER_ID.
+ conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
+ }
+
+ /**
+ * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
+ * and returns only the directories that exist / could be created.
+ *
+ * If no directories could be created, this will return an empty list.
+ */
+ private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
+ val confValue = if (isRunningInYarnContainer(conf)) {
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available.
+ getYarnLocalDirs(conf)
+ } else {
+ Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ }
+ val rootDirs = confValue.split(',')
+ logDebug(s"Getting/creating local root dirs at '$confValue'")
+
+ rootDirs.flatMap { rootDir =>
+ val localDir: File = new File(rootDir)
+ val foundLocalDir = localDir.exists || localDir.mkdirs()
+ if (!foundLocalDir) {
+ logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
+ None
+ } else {
+ Some(rootDir)
+ }
+ }
+ }
+
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(conf: SparkConf): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(conf.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ localDirs
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 20bac66105..f32ce6f9fc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- System.getProperty("java.io.tmpdir"))
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 777579bc57..aabaeadd7a 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
}
override def beforeEach() {
- diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
+ val conf = testConf.clone
+ conf.set("spark.local.dir", rootDirs)
+ diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
shuffleBlockManager.idToSegmentMap.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
new file mode 100644
index 0000000000..dae7bf0e33
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+
+
+/**
+ * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
+ */
+class LocalDirsSuite extends FunSuite {
+
+ test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
+ // Regression test for SPARK-2974
+ assert(!new File("/NONEXISTENT_DIR").exists())
+ val conf = new SparkConf(false)
+ .set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
+ assert(new File(Utils.getLocalDir(conf)).exists())
+ }
+
+ test("SPARK_LOCAL_DIRS override also affects driver") {
+ // Regression test for SPARK-2975
+ assert(!new File("/NONEXISTENT_DIR").exists())
+ // SPARK_LOCAL_DIRS is a valid directory:
+ class MySparkConf extends SparkConf(false) {
+ override def getenv(name: String) = {
+ if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
+ else super.getenv(name)
+ }
+
+ override def clone: SparkConf = {
+ new MySparkConf().setAll(settings)
+ }
+ }
+ // spark.local.dir only contains invalid directories, but that's not a problem since
+ // SPARK_LOCAL_DIRS will override it on both the driver and workers:
+ val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
+ assert(new File(Utils.getLocalDir(conf)).exists())
+ }
+
+}