aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2015-12-21 14:02:40 -0800
committerAndrew Or <andrew@databricks.com>2015-12-21 14:03:23 -0800
commit935f46630685306edbdec91f71710703317fe129 (patch)
tree7f87dfc225dd5ae8070162b61b2f6d1a6fcd9c4b
parent4883a5087d481d4de5d3beabbd709853de01399a (diff)
downloadspark-935f46630685306edbdec91f71710703317fe129.tar.gz
spark-935f46630685306edbdec91f71710703317fe129.tar.bz2
spark-935f46630685306edbdec91f71710703317fe129.zip
[SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts
When multiple workers exist in a host, we can bypass unnecessary remote access for broadcasts; block managers fetch broadcast blocks from the same host instead of remote hosts. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #10346 from maropu/OptimizeBlockLocationOrder.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala19
2 files changed, 29 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6074fc58d7..b5b7804d54 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -578,9 +578,19 @@ private[spark] class BlockManager(
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
+ /**
+ * Return a list of locations for the given block, prioritizing the local machine since
+ * multiple block managers can share the same host.
+ */
+ private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
+ val locs = Random.shuffle(master.getLocations(blockId))
+ val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
+ preferredLocs ++ otherLocs
+ }
+
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
- val locations = Random.shuffle(master.getLocations(blockId))
+ val locations = getLocations(blockId)
var numFetchFailures = 0
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 53991d8a1a..bf49be3d4c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -26,6 +26,7 @@ import scala.language.implicitConversions
import scala.language.postfixOps
import org.mockito.Mockito.{mock, when}
+import org.mockito.{Matchers => mc}
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
@@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
private def makeBlockManager(
maxMem: Long,
- name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+ name: String = SparkContext.DRIVER_IDENTIFIER,
+ master: BlockManagerMaster = this.master): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}
+ test("optimize a location order of blocks") {
+ val localHost = Utils.localHostName()
+ val otherHost = "otherHost"
+ val bmMaster = mock(classOf[BlockManagerMaster])
+ val bmId1 = BlockManagerId("id1", localHost, 1)
+ val bmId2 = BlockManagerId("id2", localHost, 2)
+ val bmId3 = BlockManagerId("id3", otherHost, 3)
+ when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
+
+ val blockManager = makeBlockManager(128, "exec", bmMaster)
+ val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+ val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+ assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+ }
+
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
val origTimeoutOpt = conf.getOption("spark.network.timeout")
try {