aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShubham Chopra <schopra31@bloomberg.net>2017-03-30 22:21:57 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-30 22:21:57 +0800
commitb454d4402e5ee7d1a7385d1fe3737581f84d2c72 (patch)
treef4f536b7622918d1fd0096f6ab510dd6cdc3ffed /core/src
parentedc87d76efea7b4d19d9d0c4ddba274a3ccb8752 (diff)
downloadspark-b454d4402e5ee7d1a7385d1fe3737581f84d2c72.tar.gz
spark-b454d4402e5ee7d1a7385d1fe3737581f84d2c72.tar.bz2
spark-b454d4402e5ee7d1a7385d1fe3737581f84d2c72.zip
[SPARK-15354][CORE] Topology aware block replication strategies
## What changes were proposed in this pull request? Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack. The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds. The prioritization itself can be thought of an optimization problem to find a minimal set of peers that satisfy certain objectives and replicating to these peers first. The objectives can be used to express richer constraints over and above HDFS like 3-replica strategy. ## How was this patch tested? This patch was tested with unit tests for storage, along with new unit tests to verify prioritization behaviour. Author: Shubham Chopra <schopra31@bloomberg.net> Closes #13932 from shubhamchopra/PrioritizerStrategy.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala145
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala73
4 files changed, 222 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index fcda9fa653..46a078b2f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -49,7 +49,6 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
-
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
@@ -1258,7 +1257,6 @@ private[spark] class BlockManager(
replication = 1)
val numPeersToReplicateTo = level.replication - 1
-
val startTime = System.nanoTime
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
@@ -1313,7 +1311,6 @@ private[spark] class BlockManager(
numPeersToReplicateTo - peersReplicatedTo.size)
}
}
-
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
index bb8a684b4c..353eac60df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
@@ -53,6 +53,46 @@ trait BlockReplicationPolicy {
numReplicas: Int): List[BlockManagerId]
}
+object BlockReplicationUtils {
+ // scalastyle:off line.size.limit
+ /**
+ * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
+ * minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
+ * here</a>.
+ *
+ * @param n total number of indices
+ * @param m number of samples needed
+ * @param r random number generator
+ * @return list of m random unique indices
+ */
+ // scalastyle:on line.size.limit
+ private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
+ val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) =>
+ val t = r.nextInt(i) + 1
+ if (set.contains(t)) set + i else set + t
+ }
+ indices.map(_ - 1).toList
+ }
+
+ /**
+ * Get a random sample of size m from the elems
+ *
+ * @param elems
+ * @param m number of samples needed
+ * @param r random number generator
+ * @tparam T
+ * @return a random list of size m. If there are fewer than m elements in elems, we just
+ * randomly shuffle elems
+ */
+ def getRandomSample[T](elems: Seq[T], m: Int, r: Random): List[T] = {
+ if (elems.size > m) {
+ getSampleIds(elems.size, m, r).map(elems(_))
+ } else {
+ r.shuffle(elems).toList
+ }
+ }
+}
+
@DeveloperApi
class RandomBlockReplicationPolicy
extends BlockReplicationPolicy
@@ -67,6 +107,7 @@ class RandomBlockReplicationPolicy
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
+ * @param numReplicas Number of peers we need to replicate to
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
override def prioritize(
@@ -78,7 +119,7 @@ class RandomBlockReplicationPolicy
val random = new Random(blockId.hashCode)
logDebug(s"Input peers : ${peers.mkString(", ")}")
val prioritizedPeers = if (peers.size > numReplicas) {
- getSampleIds(peers.size, numReplicas, random).map(peers(_))
+ BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
if (peers.size < numReplicas) {
logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
@@ -88,26 +129,96 @@ class RandomBlockReplicationPolicy
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
prioritizedPeers
}
+}
+
+@DeveloperApi
+class BasicBlockReplicationPolicy
+ extends BlockReplicationPolicy
+ with Logging {
- // scalastyle:off line.size.limit
/**
- * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
- * minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
- * here</a>.
+ * Method to prioritize a bunch of candidate peers of a block manager. This implementation
+ * replicates the behavior of block replication in HDFS. For a given number of replicas needed,
+ * we choose a peer within the rack, one outside and remaining blockmanagers are chosen at
+ * random, in that order till we meet the number of replicas needed.
+ * This works best with a total replication factor of 3, like HDFS.
*
- * @param n total number of indices
- * @param m number of samples needed
- * @param r random number generator
- * @return list of m random unique indices
+ * @param blockManagerId Id of the current BlockManager for self identification
+ * @param peers A list of peers of a BlockManager
+ * @param peersReplicatedTo Set of peers already replicated to
+ * @param blockId BlockId of the block being replicated. This can be used as a source of
+ * randomness if needed.
+ * @param numReplicas Number of peers we need to replicate to
+ * @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
- // scalastyle:on line.size.limit
- private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
- val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) =>
- val t = r.nextInt(i) + 1
- if (set.contains(t)) set + i else set + t
+ override def prioritize(
+ blockManagerId: BlockManagerId,
+ peers: Seq[BlockManagerId],
+ peersReplicatedTo: mutable.HashSet[BlockManagerId],
+ blockId: BlockId,
+ numReplicas: Int): List[BlockManagerId] = {
+
+ logDebug(s"Input peers : $peers")
+ logDebug(s"BlockManagerId : $blockManagerId")
+
+ val random = new Random(blockId.hashCode)
+
+ // if block doesn't have topology info, we can't do much, so we randomly shuffle
+ // if there is, we see what's needed from peersReplicatedTo and based on numReplicas,
+ // we choose whats needed
+ if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
+ // no topology info for the block. The best we can do is randomly choose peers
+ BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
+ } else {
+ // we have topology information, we see what is left to be done from peersReplicatedTo
+ val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo)
+ val doneOutsideRack = peersReplicatedTo.exists { p =>
+ p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo
+ }
+
+ if (doneOutsideRack && doneWithinRack) {
+ // we are done, we just return a random sample
+ BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
+ } else {
+ // we separate peers within and outside rack
+ val (inRackPeers, outOfRackPeers) = peers
+ .filter(_.host != blockManagerId.host)
+ .partition(_.topologyInfo == blockManagerId.topologyInfo)
+
+ val peerWithinRack = if (doneWithinRack) {
+ // we are done with in-rack replication, so don't need anymore peers
+ Seq.empty
+ } else {
+ if (inRackPeers.isEmpty) {
+ Seq.empty
+ } else {
+ Seq(inRackPeers(random.nextInt(inRackPeers.size)))
+ }
+ }
+
+ val peerOutsideRack = if (doneOutsideRack || numReplicas - peerWithinRack.size <= 0) {
+ Seq.empty
+ } else {
+ if (outOfRackPeers.isEmpty) {
+ Seq.empty
+ } else {
+ Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size)))
+ }
+ }
+
+ val priorityPeers = peerWithinRack ++ peerOutsideRack
+ val numRemainingPeers = numReplicas - priorityPeers.size
+ val remainingPeers = if (numRemainingPeers > 0) {
+ val rPeers = peers.filter(p => !priorityPeers.contains(p))
+ BlockReplicationUtils.getRandomSample(rPeers, numRemainingPeers, random)
+ } else {
+ Seq.empty
+ }
+
+ (priorityPeers ++ remainingPeers).toList
+ }
+
}
- // we shuffle the result to ensure a random arrangement within the sample
- // to avoid any bias from set implementations
- r.shuffle(indices.map(_ - 1).toList)
}
+
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d5715f8469..13020acdd3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.internal.Logging
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -36,6 +37,7 @@ import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
+import org.apache.spark.util.Utils
trait BlockManagerReplicationBehavior extends SparkFunSuite
with Matchers
@@ -43,6 +45,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
with LocalSparkContext {
val conf: SparkConf
+
protected var rpcEnv: RpcEnv = null
protected var master: BlockManagerMaster = null
protected lazy val securityMgr = new SecurityManager(conf)
@@ -55,7 +58,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected val allStores = new ArrayBuffer[BlockManager]
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-
protected lazy val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
@@ -471,7 +473,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
conf.set("spark.storage.replication.proactive", "true")
conf.set("spark.storage.exceptionOnPinLeak", "true")
- (2 to 5).foreach{ i =>
+ (2 to 5).foreach { i =>
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
testProactiveReplication(i)
}
@@ -524,3 +526,30 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
}
}
}
+
+class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
+ // number of racks to test with
+ val numRacks = 3
+
+ /**
+ * Gets the topology information given the host name
+ *
+ * @param hostname Hostname
+ * @return random topology
+ */
+ override def getTopologyForHost(hostname: String): Option[String] = {
+ Some(s"/Rack-${Utils.random.nextInt(numRacks)}")
+ }
+}
+
+class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
+ val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
+ conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set(
+ "spark.storage.replication.policy",
+ classOf[BasicBlockReplicationPolicy].getName)
+ conf.set(
+ "spark.storage.replication.topologyMapper",
+ classOf[DummyTopologyMapper].getName)
+}
+
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala
index 800c3899f1..ecad0f5352 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala
@@ -18,34 +18,34 @@
package org.apache.spark.storage
import scala.collection.mutable
+import scala.util.Random
import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.{LocalSparkContext, SparkFunSuite}
-class BlockReplicationPolicySuite extends SparkFunSuite
+class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {
// Implicitly convert strings to BlockIds for test clarity.
- private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy
+
+ val blockId = "test-block"
/**
* Test if we get the required number of peers when using random sampling from
- * RandomBlockReplicationPolicy
+ * BlockReplicationPolicy
*/
- test(s"block replication - random block replication policy") {
+ test("block replication - random block replication policy") {
val numBlockManagers = 10
val storeSize = 1000
- val blockManagers = (1 to numBlockManagers).map { i =>
- BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
- }
+ val blockManagers = generateBlockManagerIds(numBlockManagers, Seq("/Rack-1"))
val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None)
- val replicationPolicy = new RandomBlockReplicationPolicy
- val blockId = "test-block"
- (1 to 10).foreach {numReplicas =>
+ (1 to 10).foreach { numReplicas =>
logDebug(s"Num replicas : $numReplicas")
val randomPeers = replicationPolicy.prioritize(
candidateBlockManager,
@@ -68,7 +68,60 @@ class BlockReplicationPolicySuite extends SparkFunSuite
logDebug(s"Random peers : ${secondPass.mkString(", ")}")
assert(secondPass.toSet.size === numReplicas)
}
+ }
+
+ protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = {
+ (1 to count).map{i =>
+ BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size))))
+ }
+ }
+}
+
+class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
+ override val replicationPolicy = new BasicBlockReplicationPolicy
+
+ test("All peers in the same rack") {
+ val racks = Seq("/default-rack")
+ val numBlockManager = 10
+ (1 to 10).foreach {numReplicas =>
+ val peers = generateBlockManagerIds(numBlockManager, racks)
+ val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head))
+
+ val prioritizedPeers = replicationPolicy.prioritize(
+ blockManager,
+ peers,
+ mutable.HashSet.empty,
+ blockId,
+ numReplicas
+ )
+ assert(prioritizedPeers.toSet.size == numReplicas)
+ assert(prioritizedPeers.forall(p => p.host != blockManager.host))
+ }
}
+ test("Peers in 2 racks") {
+ val racks = Seq("/Rack-1", "/Rack-2")
+ (1 to 10).foreach {numReplicas =>
+ val peers = generateBlockManagerIds(10, racks)
+ val blockManager = BlockManagerId("Driver", "Host-driver", 9001, Some(racks.head))
+
+ val prioritizedPeers = replicationPolicy.prioritize(
+ blockManager,
+ peers,
+ mutable.HashSet.empty,
+ blockId,
+ numReplicas
+ )
+
+ assert(prioritizedPeers.toSet.size == numReplicas)
+ val priorityPeers = prioritizedPeers.take(2)
+ assert(priorityPeers.forall(p => p.host != blockManager.host))
+ if(numReplicas > 1) {
+ // both these conditions should be satisfied when numReplicas > 1
+ assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo))
+ assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo))
+ }
+ }
+ }
}