aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-20 02:30:23 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-20 02:58:26 -0700
commit136b9b3a3ed358bc04b28e8d62657d56d55c2c3e (patch)
tree0d8451be862e2485fcc5d8013a34307e8192ff70 /core/src/main/java
parent861dc409d7209c3a8d4518708016d1b843f5c52b (diff)
downloadspark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.gz
spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.bz2
spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.zip
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created per mapper per reducer. For large or repeated jobs, this often produces millions of shuffle files, which sees extremely degredaded performance from the OS file system. This patch seeks to reduce that burden by combining multipe shuffle files into one. This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669. However, it simplifies the design in order to get the majority of the gain with less overall intellectual and code burden. The vast majority of code in this pull request is a refactor to allow the insertion of a clean layer of indirection between logical block ids and physical files. This, I feel, provides some design clarity in addition to enabling shuffle file consolidation. The main goal is to produce one shuffle file per reducer per active mapper thread. This allows us to isolate the mappers (simplifying the failure modes), while still allowing us to reduce the number of mappers tremendously for large tasks. In order to accomplish this, we simply create a new set of shuffle files for every parallel task, and return the files to a pool which will be given out to the next run task.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java2
1 files changed, 1 insertions, 1 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index ab790b7850..172c6e4b1c 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
ctx.flush();
return;
}
- long length = file.length();
+ long length = fileSegment.length();
if (length > Integer.MAX_VALUE || length <= 0) {
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();