aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-02-12 14:46:37 -0800
committerAndrew Or <andrew@databricks.com>2015-02-12 14:46:45 -0800
commit0040fc50918cf5e53554b0dc8053528af58e6ba8 (patch)
treeabc1915cbadaeee0ad388d460f47f94406225532
parent9a1de4b20fcfa756f228b263f2a778534f6ca90d (diff)
downloadspark-0040fc50918cf5e53554b0dc8053528af58e6ba8.tar.gz
spark-0040fc50918cf5e53554b0dc8053528af58e6ba8.tar.bz2
spark-0040fc50918cf5e53554b0dc8053528af58e6ba8.zip
[SPARK-5762] Fix shuffle write time for sort-based shuffle
mateiz was excluding the time to write this final file from the shuffle write time intentional? Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4559 from kayousterhout/SPARK-5762 and squashes the following commits: 5c6f3d9 [Kay Ousterhout] Use foreach 94e4237 [Kay Ousterhout] Removed open time metrics added inadvertently ace156c [Kay Ousterhout] Moved metrics to finally block d773276 [Kay Ousterhout] Use nano time 5a59906 [Kay Ousterhout] [SPARK-5762] Fix shuffle write time for sort-based shuffle (cherry picked from commit 47c73d410ab533c3196184d2b6004081e79daeaa) Signed-off-by: Andrew Or <andrew@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala3
1 files changed, 3 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index eaec5a71e6..d69f2d9048 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -723,6 +723,7 @@ private[spark] class ExternalSorter[K, V, C](
partitionWriters.foreach(_.commitAndClose())
var out: FileOutputStream = null
var in: FileInputStream = null
+ val writeStartTime = System.nanoTime
try {
out = new FileOutputStream(outputFile, true)
for (i <- 0 until numPartitions) {
@@ -739,6 +740,8 @@ private[spark] class ExternalSorter[K, V, C](
if (in != null) {
in.close()
}
+ context.taskMetrics.shuffleWriteMetrics.foreach(
+ _.incShuffleWriteTime(System.nanoTime - writeStartTime))
}
} else {
// Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by