aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 17bf7c2541..db58eb642b 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -20,10 +20,11 @@ package org.apache.spark.tools
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.SparkContext
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util.Utils
-import org.apache.spark.executor.ShuffleWriteMetrics
/**
* Internal utility for micro-benchmarking shuffle write performance.
@@ -50,13 +51,15 @@ object StoragePerfTester {
System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.shuffle.sync", "true")
+ System.setProperty("spark.shuffle.manager",
+ "org.apache.spark.shuffle.hash.HashShuffleManager")
// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
val sc = new SparkContext("local[4]", "Write Tester")
- val blockManager = sc.env.blockManager
+ val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+ val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {