aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-08-01 13:57:19 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-01 13:57:19 -0700
commit78f2af582286b81e6dc9fa9d455ed2b369d933bd (patch)
treed1b31d36d51ef5c6442ff8f6d19434c09d61f11c /tools
parentb270309d7608fb749e402cd5afd36087446be398 (diff)
downloadspark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.tar.gz
spark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.tar.bz2
spark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.zip
SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation
All changes from this PR are by mridulm and are drawn from his work in #1609. This patch is intended to fix all major issues related to shuffle file consolidation that mridulm found, while minimizing changes to the code, with the hope that it may be more easily merged into 1.1. This patch is **not** intended as a replacement for #1609, which provides many additional benefits, including fixes to ExternalAppendOnlyMap, improvements to DiskBlockObjectWriter's API, and several new unit tests. If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable option. Author: Aaron Davidson <aaron@databricks.com> Closes #1678 from aarondav/consol and squashes the following commits: 53b3f6d [Aaron Davidson] Correct behavior when writing unopened file 701d045 [Aaron Davidson] Rebase with sort-based shuffle 9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c35615a..8a05fcb449 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
}
- writers.map {w =>
- w.commit()
+ writers.map { w =>
+ w.commitAndClose()
total.addAndGet(w.fileSegment().length)
- w.close()
}
shuffle.releaseWriters(true)