aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala32
1 files changed, 18 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 12cd8ea3bd..2883137872 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
+ // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
+ // of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val shutdownHook = addShutdownHook()
@@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
- var subDir = subDirs(dirId)(subDirId)
- if (subDir == null) {
- subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
- }
- subDirs(dirId)(subDirId) = newDir
- newDir
+ val subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ if (!newDir.exists() && !newDir.mkdir()) {
+ throw new IOException(s"Failed to create local dir in $newDir.")
}
+ subDirs(dirId)(subDirId) = newDir
+ newDir
}
}
@@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
- subDirs.flatten.filter(_ != null).flatMap { dir =>
+ subDirs.flatMap { dir =>
+ dir.synchronized {
+ // Copy the content of dir because it may be modified in other threads
+ dir.clone()
+ }
+ }.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}