diff options
Diffstat (limited to 'core/src/main/java/org')
3 files changed, 16 insertions, 21 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java index c4aa2669e0..8a09210245 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; +import org.apache.spark.storage.BlockId; abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { @@ -33,7 +34,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { } public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header); - public abstract void handleError(String blockId); + public abstract void handleError(BlockId blockId); @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) { diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index d3d57a0255..172c6e4b1c 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.DefaultFileRegion; +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { @@ -34,41 +36,36 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { } @Override - public void messageReceived(ChannelHandlerContext ctx, String blockId) { - String path = pResolver.getAbsolutePath(blockId); - // if getFilePath returns null, close the channel - if (path == null) { + public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { + BlockId blockId = BlockId.apply(blockIdString); + FileSegment fileSegment = pResolver.getBlockLocation(blockId); + // if getBlockLocation returns null, close the channel + if (fileSegment == null) { //ctx.close(); return; } - File file = new File(path); + File file = fileSegment.file(); if (file.exists()) { if (!file.isFile()) { - //logger.info("Not a file : " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } - long length = file.length(); + long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { - //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } int len = new Long(length).intValue(); - //logger.info("Sending block "+blockId+" filelen = "+len); - //logger.info("header = "+ (new FileHeader(len, blockId)).buffer()); ctx.write((new FileHeader(len, blockId)).buffer()); try { ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), 0, file.length())); + .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { - //logger.warning("Exception when sending file : " + file.getAbsolutePath()); e.printStackTrace(); } } else { - //logger.warning("File not found: " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); } ctx.flush(); diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 94c034cad0..9f7ced44cf 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -17,13 +17,10 @@ package org.apache.spark.network.netty;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
public interface PathResolver {
- /**
- * Get the absolute path of the file
- *
- * @param fileId
- * @return the absolute path of file
- */
- public String getAbsolutePath(String fileId);
+ /** Get the file segment in which the given block resides. */
+ public FileSegment getBlockLocation(BlockId blockId);
}
|