diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-08-06 13:10:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-08-06 13:10:33 -0700 |
commit | 4e982364426c7d65032e8006c63ca4f9a0d40470 (patch) | |
tree | 2ec42e3d702ad5c879ba0ec47face40e3a1fb573 /tools | |
parent | d614967b0bad1e6c5277d612602ec0a653a00258 (diff) | |
download | spark-4e982364426c7d65032e8006c63ca4f9a0d40470.tar.gz spark-4e982364426c7d65032e8006c63ca4f9a0d40470.tar.bz2 spark-4e982364426c7d65032e8006c63ca4f9a0d40470.zip |
SPARK-2566. Update ShuffleWriteMetrics incrementally
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok
Author: Sandy Ryza <sandy@cloudera.com>
Closes #1481 from sryza/sandy-spark-2566 and squashes the following commits:
8090d88 [Sandy Ryza] Fix ExternalSorter
b2a62ed [Sandy Ryza] Fix more test failures
8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private
c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
Diffstat (limited to 'tools')
-rw-r--r-- | tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 8a05fcb449..17bf7c2541 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils +import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -56,7 +57,7 @@ object StoragePerfTester { def writeOutputBytes(mapId: Int, total: AtomicLong) = { val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer(sc.conf)) + new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { writers(i % numOutputSplits).write(writeData) |