From 20fcf3d0b72f3707dc1ed95d453f570fabdefd16 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 16 Aug 2014 00:04:55 -0700 Subject: [SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager This is intended to fix SPARK-2977. Before, there was an implicit ordering dependency where we needed to know the ShuffleManager implementation before creating the ShuffleBlockManager. This patch makes that dependency explicit by adding ShuffleManager to a bunch of constructors. I think it's a little odd for BlockManager to take a ShuffleManager only to pass it to ShuffleBlockManager without using it itself; there's an opportunity to clean this up later if we sever the circular dependencies between BlockManager and other components and pass those components to BlockManager's constructor. Author: Josh Rosen Closes #1976 from JoshRosen/SPARK-2977 and squashes the following commits: a9cd1e1 [Josh Rosen] [SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager. --- .../src/main/scala/org/apache/spark/SparkEnv.scala | 22 +++++++++++----------- .../org/apache/spark/storage/BlockManager.scala | 11 +++++++---- .../apache/spark/storage/ShuffleBlockManager.scala | 7 ++++--- .../org/apache/spark/storage/ThreadingTest.scala | 3 ++- .../apache/spark/storage/BlockManagerSuite.scala | 12 +++++++----- .../spark/storage/DiskBlockManagerSuite.scala | 8 ++++++-- 6 files changed, 37 insertions(+), 26 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 22d8d1cb1d..fc36e37c53 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -210,12 +210,22 @@ object SparkEnv extends Logging { "MapOutputTracker", new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) + // Let the user specify short names for shuffle managers + val shortShuffleMgrNames = Map( + "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", + "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") + val shuffleMgrName = conf.get("spark.shuffle.manager", "hash") + val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) + val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) + + val shuffleMemoryManager = new ShuffleMemoryManager(conf) + val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, - serializer, conf, securityManager, mapOutputTracker) + serializer, conf, securityManager, mapOutputTracker, shuffleManager) val connectionManager = blockManager.connectionManager @@ -250,16 +260,6 @@ object SparkEnv extends Logging { "." } - // Let the user specify short names for shuffle managers - val shortShuffleMgrNames = Map( - "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", - "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") - val shuffleMgrName = conf.get("spark.shuffle.manager", "hash") - val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) - val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) - - val shuffleMemoryManager = new ShuffleMemoryManager(conf) - // Warn about deprecated spark.cache.class property if (conf.contains("spark.cache.class")) { logWarning("The spark.cache.class property is no longer being used! Specify storage " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e8bbd298c6..e4c3d58905 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -33,6 +33,7 @@ import org.apache.spark.executor._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.util._ private[spark] sealed trait BlockValues @@ -57,11 +58,12 @@ private[spark] class BlockManager( maxMemory: Long, val conf: SparkConf, securityManager: SecurityManager, - mapOutputTracker: MapOutputTracker) + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager) extends Logging { private val port = conf.getInt("spark.blockManager.port", 0) - val shuffleBlockManager = new ShuffleBlockManager(this) + val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = @@ -142,9 +144,10 @@ private[spark] class BlockManager( serializer: Serializer, conf: SparkConf, securityManager: SecurityManager, - mapOutputTracker: MapOutputTracker) = { + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), - conf, securityManager, mapOutputTracker) + conf, securityManager, mapOutputTracker, shuffleManager) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 3565719b54..b8f5d3a5b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.Logging import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} @@ -62,7 +63,8 @@ private[spark] trait ShuffleWriterGroup { */ // TODO: Factor this into a separate class for each ShuffleManager implementation private[spark] -class ShuffleBlockManager(blockManager: BlockManager) extends Logging { +class ShuffleBlockManager(blockManager: BlockManager, + shuffleManager: ShuffleManager) extends Logging { def conf = blockManager.conf // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. @@ -71,8 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { conf.getBoolean("spark.shuffle.consolidateFiles", false) // Are we using sort-based shuffle? - val sortBasedShuffle = - conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName + val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager] private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 75c2e09a6b..aa83ea90ee 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.util.concurrent.ArrayBlockingQueue import akka.actor._ +import org.apache.spark.shuffle.hash.HashShuffleManager import util.Random import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} @@ -101,7 +102,7 @@ private[spark] object ThreadingTest { conf) val blockManager = new BlockManager( "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, - new SecurityManager(conf), new MapOutputTrackerMaster(conf)) + new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf)) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) producers.foreach(_.start) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 94bb2c445d..20bac66105 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit import akka.actor._ import akka.pattern.ask import akka.util.Timeout +import org.apache.spark.shuffle.hash.HashShuffleManager import org.mockito.invocation.InvocationOnMock import org.mockito.Matchers.any @@ -61,6 +62,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.authenticate", "false") val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) + val shuffleManager = new HashShuffleManager(conf) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") @@ -71,8 +73,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { - new BlockManager( - name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker) + new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr, + mapOutputTracker, shuffleManager) } before { @@ -791,7 +793,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("block store put failure") { // Use Java serializer so we can create an unserializable error. store = new BlockManager("", actorSystem, master, new JavaSerializer(conf), 1200, conf, - securityMgr, mapOutputTracker) + securityMgr, mapOutputTracker, shuffleManager) // The put should fail since a1 is not serializable. class UnserializableClass @@ -1007,7 +1009,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("return error message when error occurred in BlockManagerWorker#onBlockMessageReceive") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) + securityMgr, mapOutputTracker, shuffleManager) val worker = spy(new BlockManagerWorker(store)) val connManagerId = mock(classOf[ConnectionManagerId]) @@ -1054,7 +1056,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("return ack message when no error occurred in BlocManagerWorker#onBlockMessageReceive") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) + securityMgr, mapOutputTracker, shuffleManager) val worker = spy(new BlockManagerWorker(store)) val connManagerId = mock(classOf[ConnectionManagerId]) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index b8299e2ea1..777579bc57 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.storage import java.io.{File, FileWriter} +import org.apache.spark.shuffle.hash.HashShuffleManager + import scala.collection.mutable import scala.language.reflectiveCalls @@ -42,7 +44,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before // so we coerce consolidation if not already enabled. testConf.set("spark.shuffle.consolidateFiles", "true") - val shuffleBlockManager = new ShuffleBlockManager(null) { + private val shuffleManager = new HashShuffleManager(testConf.clone) + + val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) { override def conf = testConf.clone var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) @@ -148,7 +152,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), confCopy) val store = new BlockManager("", actorSystem, master , serializer, confCopy, - securityManager, null) + securityManager, null, shuffleManager) try { -- cgit v1.2.3