diff options
author | Raymond Liu <raymond.liu@intel.com> | 2014-08-29 23:05:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-29 23:05:18 -0700 |
commit | acea92806c91535162a9fdcb1cce579e7f1f91c7 (patch) | |
tree | d5b0869bfb158825deb9148209438f84ab14ea0b /tools/src/main/scala | |
parent | 7e662af332beb171dc89027a2562d0949d69cfa0 (diff) | |
download | spark-acea92806c91535162a9fdcb1cce579e7f1f91c7.tar.gz spark-acea92806c91535162a9fdcb1cce579e7f1f91c7.tar.bz2 spark-acea92806c91535162a9fdcb1cce579e7f1f91c7.zip |
[SPARK-2288] Hide ShuffleBlockManager behind ShuffleManager
By Hiding the shuffleblockmanager behind Shufflemanager, we decouple the shuffle data's block mapping management work from Diskblockmananger. This give a more clear interface and more easy for other shuffle manager to implement their own block management logic. the jira ticket have more details.
Author: Raymond Liu <raymond.liu@intel.com>
Closes #1241 from colorant/shuffle and squashes the following commits:
0e01ae3 [Raymond Liu] Move ShuffleBlockmanager behind shuffleManager
Diffstat (limited to 'tools/src/main/scala')
-rw-r--r-- | tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 17bf7c2541..db58eb642b 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -20,10 +20,11 @@ package org.apache.spark.tools import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util.Utils -import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -50,13 +51,15 @@ object StoragePerfTester { System.setProperty("spark.shuffle.compress", "false") System.setProperty("spark.shuffle.sync", "true") + System.setProperty("spark.shuffle.manager", + "org.apache.spark.shuffle.hash.HashShuffleManager") // This is only used to instantiate a BlockManager. All thread scheduling is done manually. val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager + val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { |