aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2014-08-29 23:05:18 -0700
committerReynold Xin <rxin@apache.org>2014-08-29 23:05:18 -0700
commitacea92806c91535162a9fdcb1cce579e7f1f91c7 (patch)
treed5b0869bfb158825deb9148209438f84ab14ea0b /tools
parent7e662af332beb171dc89027a2562d0949d69cfa0 (diff)
downloadspark-acea92806c91535162a9fdcb1cce579e7f1f91c7.tar.gz
spark-acea92806c91535162a9fdcb1cce579e7f1f91c7.tar.bz2
spark-acea92806c91535162a9fdcb1cce579e7f1f91c7.zip
[SPARK-2288] Hide ShuffleBlockManager behind ShuffleManager
By Hiding the shuffleblockmanager behind Shufflemanager, we decouple the shuffle data's block mapping management work from Diskblockmananger. This give a more clear interface and more easy for other shuffle manager to implement their own block management logic. the jira ticket have more details. Author: Raymond Liu <raymond.liu@intel.com> Closes #1241 from colorant/shuffle and squashes the following commits: 0e01ae3 [Raymond Liu] Move ShuffleBlockmanager behind shuffleManager
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 17bf7c2541..db58eb642b 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -20,10 +20,11 @@ package org.apache.spark.tools
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.SparkContext
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util.Utils
-import org.apache.spark.executor.ShuffleWriteMetrics
/**
* Internal utility for micro-benchmarking shuffle write performance.
@@ -50,13 +51,15 @@ object StoragePerfTester {
System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.shuffle.sync", "true")
+ System.setProperty("spark.shuffle.manager",
+ "org.apache.spark.shuffle.hash.HashShuffleManager")
// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
val sc = new SparkContext("local[4]", "Write Tester")
- val blockManager = sc.env.blockManager
+ val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+ val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {