From 4e982364426c7d65032e8006c63ca4f9a0d40470 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 6 Aug 2014 13:10:33 -0700 Subject: SPARK-2566. Update ShuffleWriteMetrics incrementally I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok Author: Sandy Ryza Closes #1481 from sryza/sandy-spark-2566 and squashes the following commits: 8090d88 [Sandy Ryza] Fix ExternalSorter b2a62ed [Sandy Ryza] Fix more test failures 8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally --- tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'tools/src/main/scala') diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 8a05fcb449..17bf7c2541 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils +import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -56,7 +57,7 @@ object StoragePerfTester { def writeOutputBytes(mapId: Int, total: AtomicLong) = { val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer(sc.conf)) + new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { writers(i % numOutputSplits).write(writeData) -- cgit v1.2.3