aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-20 16:10:40 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-20 16:11:59 -0700
commit42a049723d92d8a7f87fae0a305f8933cb0f7374 (patch)
tree40d9fcc6647fc3110206370e636257b440fda171 /core/src
parent38b8048f291dd42ee996e75bd1b6d33aa24b1a5e (diff)
downloadspark-42a049723d92d8a7f87fae0a305f8933cb0f7374.tar.gz
spark-42a049723d92d8a7f87fae0a305f8933cb0f7374.tar.bz2
spark-42a049723d92d8a7f87fae0a305f8933cb0f7374.zip
Address Josh and Reynold's comments
Diffstat (limited to 'core/src')
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/FileSegment.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala8
6 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
index 370fcdeea9..9f7ced44cf 100755
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -21,6 +21,6 @@ import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
public interface PathResolver {
- /** Get the file segment in which the given Block resides. */
+ /** Get the file segment in which the given block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 0b5a472999..6e4382d71e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*
* This interface does not support concurrent writes.
*/
-abstract class BlockObjectWriter(val blockId: BlockId) {
+private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
var closeEventHandler: () => Unit = _
@@ -72,7 +72,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
}
/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
-class DiskBlockObjectWriter(
+private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 6ace4eb521..ecbd9c2ff7 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -22,20 +22,20 @@ import java.text.SimpleDateFormat
import java.util.{Date, Random}
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.Logging
+import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.Utils
/**
- * Creates an maintains the logical mapping between logical Blocks and physical on-disk
- * locations. By default, one Block is mapped to one file with a name given by its BlockId.
- * However, it is also possible to have a Block map to only a segment of a file, by calling
+ * Creates and maintains the logical mapping between logical blocks and physical on-disk
+ * locations. By default, one block is mapped to one file with a name given by its BlockId.
+ * However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
- * @param rootDirs The directories to use for storing Block files. Data will be hashed among these.
+ * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
-class DiskBlockManager(rootDirs: String) extends PathResolver with Logging {
+private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e703a3329c..a3c496f9e0 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -86,7 +86,6 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val segment = diskManager.getBlockLocation(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel()
val buffer = try {
- logWarning("<READ: " + segment.offset + ", " + segment.length + ">")
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
} finally {
channel.close()
@@ -106,13 +105,13 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
}
- override def remove(blockId: BlockId) = {
+ override def remove(blockId: BlockId): Boolean = {
val fileSegment = diskManager.getBlockLocation(blockId)
val file = fileSegment.file
if (file.exists() && file.length() == fileSegment.length) {
file.delete()
} else {
- if (file.length() < fileSegment.length) {
+ if (fileSegment.length < file.length()) {
logWarning("Could not delete block associated with only a part of a file: " + blockId)
}
false
diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
index 9947053e30..555486830a 100644
--- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
@@ -23,6 +23,6 @@ import java.io.File
* References a particular segment of a file (potentially the entire file),
* based off an offset and a length.
*/
-class FileSegment(val file: File, val offset: Long, val length : Long) {
+private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) {
override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 6208856e56..31849eb587 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -36,7 +36,7 @@ trait ShuffleBlocks {
* per reducer.
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
* it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
@@ -49,7 +49,8 @@ trait ShuffleBlocks {
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
- /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */
+ // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
+ // TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean
@@ -78,8 +79,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
private def getUnusedFileId(): Int = {
val fileId = unusedFileIds.poll()
- if (fileId == null) nextFileId.getAndIncrement()
- else fileId
+ if (fileId == null) nextFileId.getAndIncrement() else fileId
}
private def recycleFileId(fileId: Int) {