aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorShubham Chopra <schopra31@bloomberg.net>2017-02-24 15:40:01 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-24 15:40:01 -0800
commitfa7c582e9442b985a0493fb1dd15b3fb9b6031b4 (patch)
treee350bf3065324f11c19c77366b2cb33a5646b95b /core/src/main/scala/org
parent330c3e33bd10f035f49cf3d13357eb2d6d90dabc (diff)
downloadspark-fa7c582e9442b985a0493fb1dd15b3fb9b6031b4.tar.gz
spark-fa7c582e9442b985a0493fb1dd15b3fb9b6031b4.tar.bz2
spark-fa7c582e9442b985a0493fb1dd15b3fb9b6031b4.zip
[SPARK-15355][CORE] Proactive block replication
## What changes were proposed in this pull request? We are proposing addition of pro-active block replication in case of executor failures. BlockManagerMasterEndpoint does all the book-keeping to keep a track of all the executors and the blocks they hold. It also keeps a track of which executors are alive through heartbeats. When an executor is removed, all this book-keeping state is updated to reflect the lost executor. This step can be used to identify executors that are still in possession of a copy of the cached data and a message could be sent to them to use the existing "replicate" function to find and place new replicas on other suitable hosts. Blocks replicated this way will let the master know of their existence. This can happen when an executor is lost, and would that way be pro-active as opposed be being done at query time. ## How was this patch tested? This patch was tested with existing unit tests along with new unit tests added to test the functionality. Author: Shubham Chopra <schopra31@bloomberg.net> Closes #14412 from shubhamchopra/ProactiveBlockReplication.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala4
4 files changed, 69 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6946a98cdd..45b7338080 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1160,6 +1160,34 @@ private[spark] class BlockManager(
}
/**
+ * Called for pro-active replenishment of blocks lost due to executor failures
+ *
+ * @param blockId blockId being replicate
+ * @param existingReplicas existing block managers that have a replica
+ * @param maxReplicas maximum replicas needed
+ */
+ def replicateBlock(
+ blockId: BlockId,
+ existingReplicas: Set[BlockManagerId],
+ maxReplicas: Int): Unit = {
+ logInfo(s"Pro-actively replicating $blockId")
+ blockInfoManager.lockForReading(blockId).foreach { info =>
+ val data = doGetLocalBytes(blockId, info)
+ val storageLevel = StorageLevel(
+ useDisk = info.level.useDisk,
+ useMemory = info.level.useMemory,
+ useOffHeap = info.level.useOffHeap,
+ deserialized = info.level.deserialized,
+ replication = maxReplicas)
+ try {
+ replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
+ } finally {
+ releaseLock(blockId)
+ }
+ }
+ }
+
+ /**
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*/
@@ -1167,7 +1195,8 @@ private[spark] class BlockManager(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
- classTag: ClassTag[_]): Unit = {
+ classTag: ClassTag[_],
+ existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val tLevel = StorageLevel(
@@ -1181,20 +1210,22 @@ private[spark] class BlockManager(
val startTime = System.nanoTime
- var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+ var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0
+ val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
+
var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
- getPeers(false),
- mutable.HashSet.empty,
+ initialPeers,
+ peersReplicatedTo,
blockId,
numPeersToReplicateTo)
while(numFailures <= maxReplicationFailures &&
- !peersForReplication.isEmpty &&
- peersReplicatedTo.size != numPeersToReplicateTo) {
+ !peersForReplication.isEmpty &&
+ peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
try {
val onePeerStartTime = System.nanoTime
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 145c434a4f..84c04d2260 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
mapper
}
+ val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
+
logInfo("BlockManagerMasterEndpoint up")
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint(
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
+
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
+ // De-register the block if none of the block managers have it. Otherwise, if pro-active
+ // replication is enabled, and a block is either an RDD or a test block (the latter is used
+ // for unit testing), we send a message to a randomly chosen executor location to replicate
+ // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
+ // etc.) as replication doesn't make much sense in that context.
if (locations.size == 0) {
blockLocations.remove(blockId)
+ logWarning(s"No more replicas available for $blockId !")
+ } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
+ // As a heursitic, assume single executor failure to find out the number of replicas that
+ // existed before failure
+ val maxReplicas = locations.size + 1
+ val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+ val blockLocations = locations.toSeq
+ val candidateBMId = blockLocations(i)
+ blockManagerInfo.get(candidateBMId).foreach { bm =>
+ val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
+ val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
+ bm.slaveEndpoint.ask[Boolean](replicateMsg)
+ }
}
}
+
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
+
}
private def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index d71acbb4cf..0aea438e7f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
// blocks that the master knows about.
case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
+ // Replicate blocks that were lost due to executor failure
+ case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
+ extends ToBlockManagerSlave
+
// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index d17ddbc162..1aaa42459d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())
+
+ case ReplicateBlock(blockId, replicas, maxReplicas) =>
+ context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
+
}
private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {