aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-03-26 12:54:48 +0000
committerSean Owen <sowen@cloudera.com>2015-03-26 12:54:48 +0000
commit0c88ce5416d7687bc806a7655e17009ad5823d30 (patch)
tree3a40c072c8ac6d0e757e7ba858d5100f94b817ab /core
parentf88f51bbd461e0a42ad7021147268509b9c3c56e (diff)
downloadspark-0c88ce5416d7687bc806a7655e17009ad5823d30.tar.gz
spark-0c88ce5416d7687bc806a7655e17009ad5823d30.tar.bz2
spark-0c88ce5416d7687bc806a7655e17009ad5823d30.zip
[SPARK-6468][Block Manager] Fix the race condition of subDirs in DiskBlockManager
There are two race conditions of `subDirs` in `DiskBlockManager`: 1. `getAllFiles` does not use correct locks to read the contents in `subDirs`. Although it's designed for testing, it's still worth to add correct locks to eliminate the race condition. 2. The double-check has a race condition in `getFile(filename: String)`. If a thread finds `subDirs(dirId)(subDirId)` is not null out of the `synchronized` block, it may not be able to see the correct content of the File instance pointed by `subDirs(dirId)(subDirId)` according to the Java memory model (there is no volatile variable here). This PR fixed the above race conditions. Author: zsxwing <zsxwing@gmail.com> Closes #5136 from zsxwing/SPARK-6468 and squashes the following commits: cbb872b [zsxwing] Fix the race condition of subDirs in DiskBlockManager
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala32
1 files changed, 18 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 12cd8ea3bd..2883137872 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
+ // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
+ // of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val shutdownHook = addShutdownHook()
@@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
- var subDir = subDirs(dirId)(subDirId)
- if (subDir == null) {
- subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
- }
- subDirs(dirId)(subDirId) = newDir
- newDir
+ val subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ if (!newDir.exists() && !newDir.mkdir()) {
+ throw new IOException(s"Failed to create local dir in $newDir.")
}
+ subDirs(dirId)(subDirId) = newDir
+ newDir
}
}
@@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
- subDirs.flatten.filter(_ != null).flatMap { dir =>
+ subDirs.flatMap { dir =>
+ dir.synchronized {
+ // Copy the content of dir because it may be modified in other threads
+ dir.clone()
+ }
+ }.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}