aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 19:00:50 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 19:00:50 -0700
commite54e1d70434e9be32ae1f5e91808b2fd173b761e (patch)
tree32ba5f6718fcf4efd42830bedd4ec502f49131c0 /core/src/main
parentae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5 (diff)
downloadspark-e54e1d70434e9be32ae1f5e91808b2fd173b761e.tar.gz
spark-e54e1d70434e9be32ae1f5e91808b2fd173b761e.tar.bz2
spark-e54e1d70434e9be32ae1f5e91808b2fd173b761e.zip
Made subdirs per local dir configurable, and reduced lock usage a bit
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala27
1 files changed, 15 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3845cb5d0e..e912bc985b 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -14,13 +14,13 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- val SUBDIRS_PER_LOCAL_DIR = 128
+ val subDirsPerLocalDir = System.getProperty("spark.diskStore.subdirs", "64").toInt
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
// having really large inodes at the top level.
val localDirs = createLocalDirs()
- val subDirs = Array.fill(localDirs.length)(new Array[File](SUBDIRS_PER_LOCAL_DIR))
+ val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
addShutdownHook()
@@ -92,18 +92,21 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = math.abs(blockId.hashCode)
val dirId = hash % localDirs.length
- val subDirId = (hash / localDirs.length) % SUBDIRS_PER_LOCAL_DIR
+ val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
- val subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- newDir.mkdir()
- subDirs(dirId)(subDirId) = newDir
- newDir
+ var subDir = subDirs(dirId)(subDirId)
+ if (subDir == null) {
+ subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ newDir.mkdir()
+ subDirs(dirId)(subDirId) = newDir
+ newDir
+ }
}
}