aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-08-16 00:04:55 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-16 00:04:55 -0700
commit20fcf3d0b72f3707dc1ed95d453f570fabdefd16 (patch)
tree44d1b667829ec26aa766b21b0bd2e5cd6c7e807b /core
parenta83c7723bf7a90dc6cd5dde98a179303b7542020 (diff)
downloadspark-20fcf3d0b72f3707dc1ed95d453f570fabdefd16.tar.gz
spark-20fcf3d0b72f3707dc1ed95d453f570fabdefd16.tar.bz2
spark-20fcf3d0b72f3707dc1ed95d453f570fabdefd16.zip
[SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager
This is intended to fix SPARK-2977. Before, there was an implicit ordering dependency where we needed to know the ShuffleManager implementation before creating the ShuffleBlockManager. This patch makes that dependency explicit by adding ShuffleManager to a bunch of constructors. I think it's a little odd for BlockManager to take a ShuffleManager only to pass it to ShuffleBlockManager without using it itself; there's an opportunity to clean this up later if we sever the circular dependencies between BlockManager and other components and pass those components to BlockManager's constructor. Author: Josh Rosen <joshrosen@apache.org> Closes #1976 from JoshRosen/SPARK-2977 and squashes the following commits: a9cd1e1 [Josh Rosen] [SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala8
6 files changed, 37 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 22d8d1cb1d..fc36e37c53 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
+ // Let the user specify short names for shuffle managers
+ val shortShuffleMgrNames = Map(
+ "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
+ "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+ val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
+ val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
+ val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
+
+ val shuffleMemoryManager = new ShuffleMemoryManager(conf)
+
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
- serializer, conf, securityManager, mapOutputTracker)
+ serializer, conf, securityManager, mapOutputTracker, shuffleManager)
val connectionManager = blockManager.connectionManager
@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
"."
}
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
- "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
- val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
- val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
- val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
-
- val shuffleMemoryManager = new ShuffleMemoryManager(conf)
-
// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e8bbd298c6..e4c3d58905 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -33,6 +33,7 @@ import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
private[spark] sealed trait BlockValues
@@ -57,11 +58,12 @@ private[spark] class BlockManager(
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
- mapOutputTracker: MapOutputTracker)
+ mapOutputTracker: MapOutputTracker,
+ shuffleManager: ShuffleManager)
extends Logging {
private val port = conf.getInt("spark.blockManager.port", 0)
- val shuffleBlockManager = new ShuffleBlockManager(this)
+ val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager =
@@ -142,9 +144,10 @@ private[spark] class BlockManager(
serializer: Serializer,
conf: SparkConf,
securityManager: SecurityManager,
- mapOutputTracker: MapOutputTracker) = {
+ mapOutputTracker: MapOutputTracker,
+ shuffleManager: ShuffleManager) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
- conf, securityManager, mapOutputTracker)
+ conf, securityManager, mapOutputTracker, shuffleManager)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 3565719b54..b8f5d3a5b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -62,7 +63,8 @@ private[spark] trait ShuffleWriterGroup {
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation
private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
+class ShuffleBlockManager(blockManager: BlockManager,
+ shuffleManager: ShuffleManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -71,8 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
conf.getBoolean("spark.shuffle.consolidateFiles", false)
// Are we using sort-based shuffle?
- val sortBasedShuffle =
- conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
+ val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager]
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 75c2e09a6b..aa83ea90ee 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.util.concurrent.ArrayBlockingQueue
import akka.actor._
+import org.apache.spark.shuffle.hash.HashShuffleManager
import util.Random
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
@@ -101,7 +102,7 @@ private[spark] object ThreadingTest {
conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
- new SecurityManager(conf), new MapOutputTrackerMaster(conf))
+ new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf))
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 94bb2c445d..20bac66105 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
+import org.apache.spark.shuffle.hash.HashShuffleManager
import org.mockito.invocation.InvocationOnMock
import org.mockito.Matchers.any
@@ -61,6 +62,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
conf.set("spark.authenticate", "false")
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val shuffleManager = new HashShuffleManager(conf)
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer.mb", "1")
@@ -71,8 +73,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
- new BlockManager(
- name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
+ new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr,
+ mapOutputTracker, shuffleManager)
}
before {
@@ -791,7 +793,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf,
- securityMgr, mapOutputTracker)
+ securityMgr, mapOutputTracker, shuffleManager)
// The put should fail since a1 is not serializable.
class UnserializableClass
@@ -1007,7 +1009,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("return error message when error occurred in BlockManagerWorker#onBlockMessageReceive") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ securityMgr, mapOutputTracker, shuffleManager)
val worker = spy(new BlockManagerWorker(store))
val connManagerId = mock(classOf[ConnectionManagerId])
@@ -1054,7 +1056,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("return ack message when no error occurred in BlocManagerWorker#onBlockMessageReceive") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ securityMgr, mapOutputTracker, shuffleManager)
val worker = spy(new BlockManagerWorker(store))
val connManagerId = mock(classOf[ConnectionManagerId])
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index b8299e2ea1..777579bc57 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.storage
import java.io.{File, FileWriter}
+import org.apache.spark.shuffle.hash.HashShuffleManager
+
import scala.collection.mutable
import scala.language.reflectiveCalls
@@ -42,7 +44,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
// so we coerce consolidation if not already enabled.
testConf.set("spark.shuffle.consolidateFiles", "true")
- val shuffleBlockManager = new ShuffleBlockManager(null) {
+ private val shuffleManager = new HashShuffleManager(testConf.clone)
+
+ val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) {
override def conf = testConf.clone
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
@@ -148,7 +152,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
confCopy)
val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
- securityManager, null)
+ securityManager, null, shuffleManager)
try {