From acea92806c91535162a9fdcb1cce579e7f1f91c7 Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Fri, 29 Aug 2014 23:05:18 -0700 Subject: [SPARK-2288] Hide ShuffleBlockManager behind ShuffleManager By Hiding the shuffleblockmanager behind Shufflemanager, we decouple the shuffle data's block mapping management work from Diskblockmananger. This give a more clear interface and more easy for other shuffle manager to implement their own block management logic. the jira ticket have more details. Author: Raymond Liu Closes #1241 from colorant/shuffle and squashes the following commits: 0e01ae3 [Raymond Liu] Move ShuffleBlockmanager behind shuffleManager --- .../main/scala/org/apache/spark/tools/StoragePerfTester.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'tools/src/main') diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 17bf7c2541..db58eb642b 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -20,10 +20,11 @@ package org.apache.spark.tools import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util.Utils -import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -50,13 +51,15 @@ object StoragePerfTester { System.setProperty("spark.shuffle.compress", "false") System.setProperty("spark.shuffle.sync", "true") + System.setProperty("spark.shuffle.manager", + "org.apache.spark.shuffle.hash.HashShuffleManager") // This is only used to instantiate a BlockManager. All thread scheduling is done manually. val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager + val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { -- cgit v1.2.3