diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-10-20 16:10:40 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-10-20 16:11:59 -0700 |
commit | 42a049723d92d8a7f87fae0a305f8933cb0f7374 (patch) | |
tree | 40d9fcc6647fc3110206370e636257b440fda171 /core/src | |
parent | 38b8048f291dd42ee996e75bd1b6d33aa24b1a5e (diff) | |
download | spark-42a049723d92d8a7f87fae0a305f8933cb0f7374.tar.gz spark-42a049723d92d8a7f87fae0a305f8933cb0f7374.tar.bz2 spark-42a049723d92d8a7f87fae0a305f8933cb0f7374.zip |
Address Josh and Reynold's comments
Diffstat (limited to 'core/src')
6 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 370fcdeea9..9f7ced44cf 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -21,6 +21,6 @@ import org.apache.spark.storage.BlockId; import org.apache.spark.storage.FileSegment;
public interface PathResolver {
- /** Get the file segment in which the given Block resides. */
+ /** Get the file segment in which the given block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 0b5a472999..6e4382d71e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -abstract class BlockObjectWriter(val blockId: BlockId) { +private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { var closeEventHandler: () => Unit = _ @@ -72,7 +72,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -class DiskBlockObjectWriter( +private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 6ace4eb521..ecbd9c2ff7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -22,20 +22,20 @@ import java.text.SimpleDateFormat import java.util.{Date, Random} import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils /** - * Creates an maintains the logical mapping between logical Blocks and physical on-disk - * locations. By default, one Block is mapped to one file with a name given by its BlockId. - * However, it is also possible to have a Block map to only a segment of a file, by calling + * Creates and maintains the logical mapping between logical blocks and physical on-disk + * locations. By default, one block is mapped to one file with a name given by its BlockId. + * However, it is also possible to have a block map to only a segment of a file, by calling * mapBlockToFileSegment(). * - * @param rootDirs The directories to use for storing Block files. Data will be hashed among these. + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { +private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e703a3329c..a3c496f9e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -86,7 +86,6 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val segment = diskManager.getBlockLocation(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel() val buffer = try { - logWarning("<READ: " + segment.offset + ", " + segment.length + ">") channel.map(MapMode.READ_ONLY, segment.offset, segment.length) } finally { channel.close() @@ -106,13 +105,13 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) } - override def remove(blockId: BlockId) = { + override def remove(blockId: BlockId): Boolean = { val fileSegment = diskManager.getBlockLocation(blockId) val file = fileSegment.file if (file.exists() && file.length() == fileSegment.length) { file.delete() } else { - if (file.length() < fileSegment.length) { + if (fileSegment.length < file.length()) { logWarning("Could not delete block associated with only a part of a file: " + blockId) } false diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala index 9947053e30..555486830a 100644 --- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -23,6 +23,6 @@ import java.io.File * References a particular segment of a file (potentially the entire file), * based off an offset and a length. */ -class FileSegment(val file: File, val offset: Long, val length : Long) { +private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) { override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) -}
\ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 6208856e56..31849eb587 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -36,7 +36,7 @@ trait ShuffleBlocks { * per reducer. * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, * it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: @@ -49,7 +49,8 @@ trait ShuffleBlocks { */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { - /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */ + // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. + // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean @@ -78,8 +79,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { private def getUnusedFileId(): Int = { val fileId = unusedFileIds.poll() - if (fileId == null) nextFileId.getAndIncrement() - else fileId + if (fileId == null) nextFileId.getAndIncrement() else fileId } private def recycleFileId(fileId: Int) { |