diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2015-12-21 14:02:40 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-12-21 14:03:23 -0800 |
commit | 935f46630685306edbdec91f71710703317fe129 (patch) | |
tree | 7f87dfc225dd5ae8070162b61b2f6d1a6fcd9c4b /core/src | |
parent | 4883a5087d481d4de5d3beabbd709853de01399a (diff) | |
download | spark-935f46630685306edbdec91f71710703317fe129.tar.gz spark-935f46630685306edbdec91f71710703317fe129.tar.bz2 spark-935f46630685306edbdec91f71710703317fe129.zip |
[SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts
When multiple workers exist in a host, we can bypass unnecessary remote access for broadcasts; block managers fetch broadcast blocks from the same host instead of remote hosts.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #10346 from maropu/OptimizeBlockLocationOrder.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 12 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 19 |
2 files changed, 29 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6074fc58d7..b5b7804d54 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,9 +578,19 @@ private[spark] class BlockManager( doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } + /** + * Return a list of locations for the given block, prioritizing the local machine since + * multiple block managers can share the same host. + */ + private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + val locs = Random.shuffle(master.getLocations(blockId)) + val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } + preferredLocs ++ otherLocs + } + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") - val locations = Random.shuffle(master.getLocations(blockId)) + val locations = getLocations(blockId) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8a1a..bf49be3d4c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.implicitConversions import scala.language.postfixOps import org.mockito.Mockito.{mock, when} +import org.mockito.{Matchers => mc} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def makeBlockManager( maxMem: Long, - name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + name: String = SparkContext.DRIVER_IDENTIFIER, + master: BlockManagerMaster = this.master): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("optimize a location order of blocks") { + val localHost = Utils.localHostName() + val otherHost = "otherHost" + val bmMaster = mock(classOf[BlockManagerMaster]) + val bmId1 = BlockManagerId("id1", localHost, 1) + val bmId2 = BlockManagerId("id2", localHost, 2) + val bmId3 = BlockManagerId("id3", otherHost, 3) + when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) + + val blockManager = makeBlockManager(128, "exec", bmMaster) + val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) + val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) + } + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { val origTimeoutOpt = conf.getOption("spark.network.timeout") try { |