aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-08-06 13:10:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-06 13:10:33 -0700
commit4e982364426c7d65032e8006c63ca4f9a0d40470 (patch)
tree2ec42e3d702ad5c879ba0ec47face40e3a1fb573 /tools
parentd614967b0bad1e6c5277d612602ec0a653a00258 (diff)
downloadspark-4e982364426c7d65032e8006c63ca4f9a0d40470.tar.gz
spark-4e982364426c7d65032e8006c63ca4f9a0d40470.tar.bz2
spark-4e982364426c7d65032e8006c63ca4f9a0d40470.zip
SPARK-2566. Update ShuffleWriteMetrics incrementally
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok Author: Sandy Ryza <sandy@cloudera.com> Closes #1481 from sryza/sandy-spark-2566 and squashes the following commits: 8090d88 [Sandy Ryza] Fix ExternalSorter b2a62ed [Sandy Ryza] Fix more test failures 8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8a05fcb449..17bf7c2541 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
+import org.apache.spark.executor.ShuffleWriteMetrics
/**
* Internal utility for micro-benchmarking shuffle write performance.
@@ -56,7 +57,7 @@ object StoragePerfTester {
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer(sc.conf))
+ new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)