aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala95
-rw-r--r--docs/configuration.md9
6 files changed, 154 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6946a98cdd..45b7338080 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1160,6 +1160,34 @@ private[spark] class BlockManager(
}
/**
+ * Called for pro-active replenishment of blocks lost due to executor failures
+ *
+ * @param blockId blockId being replicate
+ * @param existingReplicas existing block managers that have a replica
+ * @param maxReplicas maximum replicas needed
+ */
+ def replicateBlock(
+ blockId: BlockId,
+ existingReplicas: Set[BlockManagerId],
+ maxReplicas: Int): Unit = {
+ logInfo(s"Pro-actively replicating $blockId")
+ blockInfoManager.lockForReading(blockId).foreach { info =>
+ val data = doGetLocalBytes(blockId, info)
+ val storageLevel = StorageLevel(
+ useDisk = info.level.useDisk,
+ useMemory = info.level.useMemory,
+ useOffHeap = info.level.useOffHeap,
+ deserialized = info.level.deserialized,
+ replication = maxReplicas)
+ try {
+ replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
+ } finally {
+ releaseLock(blockId)
+ }
+ }
+ }
+
+ /**
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*/
@@ -1167,7 +1195,8 @@ private[spark] class BlockManager(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
- classTag: ClassTag[_]): Unit = {
+ classTag: ClassTag[_],
+ existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val tLevel = StorageLevel(
@@ -1181,20 +1210,22 @@ private[spark] class BlockManager(
val startTime = System.nanoTime
- var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+ var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0
+ val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
+
var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
- getPeers(false),
- mutable.HashSet.empty,
+ initialPeers,
+ peersReplicatedTo,
blockId,
numPeersToReplicateTo)
while(numFailures <= maxReplicationFailures &&
- !peersForReplication.isEmpty &&
- peersReplicatedTo.size != numPeersToReplicateTo) {
+ !peersForReplication.isEmpty &&
+ peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
try {
val onePeerStartTime = System.nanoTime
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 145c434a4f..84c04d2260 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
mapper
}
+ val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
+
logInfo("BlockManagerMasterEndpoint up")
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint(
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
+
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
+ // De-register the block if none of the block managers have it. Otherwise, if pro-active
+ // replication is enabled, and a block is either an RDD or a test block (the latter is used
+ // for unit testing), we send a message to a randomly chosen executor location to replicate
+ // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
+ // etc.) as replication doesn't make much sense in that context.
if (locations.size == 0) {
blockLocations.remove(blockId)
+ logWarning(s"No more replicas available for $blockId !")
+ } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
+ // As a heursitic, assume single executor failure to find out the number of replicas that
+ // existed before failure
+ val maxReplicas = locations.size + 1
+ val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+ val blockLocations = locations.toSeq
+ val candidateBMId = blockLocations(i)
+ blockManagerInfo.get(candidateBMId).foreach { bm =>
+ val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
+ val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
+ bm.slaveEndpoint.ask[Boolean](replicateMsg)
+ }
}
}
+
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
+
}
private def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index d71acbb4cf..0aea438e7f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
// blocks that the master knows about.
case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
+ // Replicate blocks that were lost due to executor failure
+ case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
+ extends ToBlockManagerSlave
+
// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index d17ddbc162..1aaa42459d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())
+
+ case ReplicateBlock(blockId, replicas, maxReplicas) =>
+ context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
+
}
private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f4bfdc2fd6..ccede34b8c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -37,32 +37,31 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
-/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite
- with Matchers
- with BeforeAndAfter
- with LocalSparkContext {
-
- private val conf = new SparkConf(false).set("spark.app.id", "test")
- private var rpcEnv: RpcEnv = null
- private var master: BlockManagerMaster = null
- private val securityMgr = new SecurityManager(conf)
- private val bcastManager = new BroadcastManager(true, conf, securityMgr)
- private val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
- private val shuffleManager = new SortShuffleManager(conf)
+trait BlockManagerReplicationBehavior extends SparkFunSuite
+ with Matchers
+ with BeforeAndAfter
+ with LocalSparkContext {
+
+ val conf: SparkConf
+ protected var rpcEnv: RpcEnv = null
+ protected var master: BlockManagerMaster = null
+ protected lazy val securityMgr = new SecurityManager(conf)
+ protected lazy val bcastManager = new BroadcastManager(true, conf, securityMgr)
+ protected lazy val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
+ protected lazy val shuffleManager = new SortShuffleManager(conf)
// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
- private val allStores = new ArrayBuffer[BlockManager]
+ protected val allStores = new ArrayBuffer[BlockManager]
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- conf.set("spark.kryoserializer.buffer", "1m")
- private val serializer = new KryoSerializer(conf)
+
+ protected lazy val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
- private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
- private def makeBlockManager(
+ protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
@@ -355,7 +354,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
* is correct. Then it also drops the block from memory of each store (using LRU) and
* again checks whether the master's knowledge gets updated.
*/
- private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+ protected def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
import org.apache.spark.storage.StorageLevel._
assert(maxReplication > 1,
@@ -448,3 +447,61 @@ class BlockManagerReplicationSuite extends SparkFunSuite
}
}
}
+
+class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
+ val conf = new SparkConf(false).set("spark.app.id", "test")
+ conf.set("spark.kryoserializer.buffer", "1m")
+}
+
+class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
+ val conf = new SparkConf(false).set("spark.app.id", "test")
+ conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set("spark.storage.replication.proactive", "true")
+ conf.set("spark.storage.exceptionOnPinLeak", "true")
+
+ (2 to 5).foreach{ i =>
+ test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
+ testProactiveReplication(i)
+ }
+ }
+
+ def testProactiveReplication(replicationFactor: Int) {
+ val blockSize = 1000
+ val storeSize = 10000
+ val initialStores = (1 to 10).map { i => makeBlockManager(storeSize, s"store$i") }
+
+ val blockId = "a1"
+
+ val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+ initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+ val blockLocations = master.getLocations(blockId)
+ logInfo(s"Initial locations : $blockLocations")
+
+ assert(blockLocations.size === replicationFactor)
+
+ // remove a random blockManager
+ val executorsToRemove = blockLocations.take(replicationFactor - 1)
+ logInfo(s"Removing $executorsToRemove")
+ executorsToRemove.foreach{exec =>
+ master.removeExecutor(exec.executorId)
+ // giving enough time for replication to happen and new block be reported to master
+ Thread.sleep(200)
+ }
+
+ // giving enough time for replication complete and locks released
+ Thread.sleep(500)
+
+ val newLocations = master.getLocations(blockId).toSet
+ logInfo(s"New locations : $newLocations")
+ assert(newLocations.size === replicationFactor)
+ // there should only be one common block manager between initial and new locations
+ assert(newLocations.intersect(blockLocations.toSet).size === 1)
+
+ // check if all the read locks have been released
+ initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
+ val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
+ assert(locks.size === 0, "Read locks unreleased!")
+ }
+ }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2fcb3a096a..63392a741a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1000,6 +1000,15 @@ Apart from these, the following properties are also available, and may be useful
storage space to unroll the new block in its entirety.
</td>
</tr>
+<tr>
+ <td><code>spark.storage.replication.proactive<code></td>
+ <td>false</td>
+ <td>
+ Enables proactive block replication for RDD blocks. Cached RDD block replicas lost due to
+ executor failures are replenished if there are any existing available replicas. This tries
+ to get the replication level of the block to the initial number.
+ </td>
+</tr>
</table>
### Execution Behavior