aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala32
1 files changed, 29 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 8fa1215011..145c434a4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -55,10 +55,21 @@ class BlockManagerMasterEndpoint(
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
+ private val topologyMapper = {
+ val topologyMapperClassName = conf.get(
+ "spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName)
+ val clazz = Utils.classForName(topologyMapperClassName)
+ val mapper =
+ clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
+ logInfo(s"Using $topologyMapperClassName for getting topology information")
+ mapper
+ }
+
+ logInfo("BlockManagerMasterEndpoint up")
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
- register(blockManagerId, maxMemSize, slaveEndpoint)
- context.reply(true)
+ context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -298,7 +309,21 @@ class BlockManagerMasterEndpoint(
).map(_.flatten.toSeq)
}
- private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
+ /**
+ * Returns the BlockManagerId with topology information populated, if available.
+ */
+ private def register(
+ idWithoutTopologyInfo: BlockManagerId,
+ maxMemSize: Long,
+ slaveEndpoint: RpcEndpointRef): BlockManagerId = {
+ // the dummy id is not expected to contain the topology information.
+ // we get that info here and respond back with a more fleshed out block manager id
+ val id = BlockManagerId(
+ idWithoutTopologyInfo.executorId,
+ idWithoutTopologyInfo.host,
+ idWithoutTopologyInfo.port,
+ topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
+
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
@@ -318,6 +343,7 @@ class BlockManagerMasterEndpoint(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
+ id
}
private def updateBlockInfo(