aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-19 16:25:08 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-20 02:48:41 -0700
commit861dc409d7209c3a8d4518708016d1b843f5c52b (patch)
tree963ce5d73c45aebb2bb86b9adf07abe3d4290394 /core/src/main/java
parent747f53892546eb8edf1c75788c9ecf9c939d375c (diff)
downloadspark-861dc409d7209c3a8d4518708016d1b843f5c52b.tar.gz
spark-861dc409d7209c3a8d4518708016d1b843f5c52b.tar.bz2
spark-861dc409d7209c3a8d4518708016d1b843f5c52b.zip
Refactor of DiskStore for shuffle file consolidation
The main goal of this refactor was to allow the interposition of a new layer which maps logical BlockIds to physical locations other than a file with the same name as the BlockId. In particular, BlockIds will need to be mappable to chunks of files, as multiple will be stored in the same file. In order to accomplish this, the following changes have been made: - Creation of DiskBlockManager, which manages the association of logical BlockIds to physical disk locations (called FileSegments). By default, Blocks are simply mapped to physical files of the same name, as before. - The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager in order to resolve the actual File location. - DiskBlockObjectWriter has been merged into BlockObjectWriter. - The Netty PathResolver has been changed to map BlockIds into FileSegments, as this codepath is the only one that uses Netty, and that is likely to remain the case. Overall, I think this refactor produces a clearer division between the logical Block paradigm and their physical on-disk location. There is now an explicit (and documented) mapping from one to the other.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java17
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java11
2 files changed, 10 insertions, 18 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index cfd8132891..ab790b7850 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@@ -37,40 +38,34 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
- String path = pResolver.getAbsolutePath(blockId.name());
- // if getFilePath returns null, close the channel
- if (path == null) {
+ FileSegment fileSegment = pResolver.getBlockLocation(blockId);
+ // if getBlockLocation returns null, close the channel
+ if (fileSegment == null) {
//ctx.close();
return;
}
- File file = new File(path);
+ File file = fileSegment.file();
if (file.exists()) {
if (!file.isFile()) {
- //logger.info("Not a file : " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
long length = file.length();
if (length > Integer.MAX_VALUE || length <= 0) {
- //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
int len = new Long(length).intValue();
- //logger.info("Sending block "+blockId+" filelen = "+len);
- //logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
- .getChannel(), 0, file.length()));
+ .getChannel(), fileSegment.offset(), fileSegment.length()));
} catch (Exception e) {
- //logger.warning("Exception when sending file : " + file.getAbsolutePath());
e.printStackTrace();
}
} else {
- //logger.warning("File not found: " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
}
ctx.flush();
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
index 94c034cad0..370fcdeea9 100755
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -17,13 +17,10 @@
package org.apache.spark.network.netty;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
public interface PathResolver {
- /**
- * Get the absolute path of the file
- *
- * @param fileId
- * @return the absolute path of file
- */
- public String getAbsolutePath(String fileId);
+ /** Get the file segment in which the given Block resides. */
+ public FileSegment getBlockLocation(BlockId blockId);
}