aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala167
1 files changed, 82 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa29acfd70..982b83324e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.storage
import java.io._
import java.nio.ByteBuffer
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable
+import scala.collection.mutable.HashMap
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.reflect.ClassTag
@@ -44,6 +45,7 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
+
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
@@ -147,6 +149,8 @@ private[spark] class BlockManager(
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L
+ private var blockReplicationPolicy: BlockReplicationPolicy = _
+
/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
@@ -160,8 +164,24 @@ private[spark] class BlockManager(
blockTransferService.init(this)
shuffleClient.init(appId)
- blockManagerId = BlockManagerId(
- executorId, blockTransferService.hostName, blockTransferService.port)
+ blockReplicationPolicy = {
+ val priorityClass = conf.get(
+ "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
+ val clazz = Utils.classForName(priorityClass)
+ val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
+ logInfo(s"Using $priorityClass for block replication policy")
+ ret
+ }
+
+ val id =
+ BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
+
+ val idFromMaster = master.registerBlockManager(
+ id,
+ maxMemory,
+ slaveEndpoint)
+
+ blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
@@ -170,12 +190,12 @@ private[spark] class BlockManager(
blockManagerId
}
- master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
-
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
+
+ logInfo(s"Initialized BlockManager: $blockManagerId")
}
private def registerWithExternalShuffleServer() {
@@ -1111,7 +1131,7 @@ private[spark] class BlockManager(
}
/**
- * Replicate block to another node. Not that this is a blocking call that returns after
+ * Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(
@@ -1119,101 +1139,78 @@ private[spark] class BlockManager(
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
+
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
- val numPeersToReplicateTo = level.replication - 1
- val peersForReplication = new ArrayBuffer[BlockManagerId]
- val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
- val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
useOffHeap = level.useOffHeap,
deserialized = level.deserialized,
replication = 1)
- val startTime = System.currentTimeMillis
- val random = new Random(blockId.hashCode)
-
- var replicationFailed = false
- var failures = 0
- var done = false
-
- // Get cached list of peers
- peersForReplication ++= getPeers(forceFetch = false)
-
- // Get a random peer. Note that this selection of a peer is deterministic on the block id.
- // So assuming the list of peers does not change and no replication failures,
- // if there are multiple attempts in the same node to replicate the same block,
- // the same set of peers will be selected.
- def getRandomPeer(): Option[BlockManagerId] = {
- // If replication had failed, then force update the cached list of peers and remove the peers
- // that have been already used
- if (replicationFailed) {
- peersForReplication.clear()
- peersForReplication ++= getPeers(forceFetch = true)
- peersForReplication --= peersReplicatedTo
- peersForReplication --= peersFailedToReplicateTo
- }
- if (!peersForReplication.isEmpty) {
- Some(peersForReplication(random.nextInt(peersForReplication.size)))
- } else {
- None
- }
- }
- // One by one choose a random peer and try uploading the block to it
- // If replication fails (e.g., target peer is down), force the list of cached peers
- // to be re-fetched from driver and then pick another random peer for replication. Also
- // temporarily black list the peer for which replication failed.
- //
- // This selection of a peer and replication is continued in a loop until one of the
- // following 3 conditions is fulfilled:
- // (i) specified number of peers have been replicated to
- // (ii) too many failures in replicating to peers
- // (iii) no peer left to replicate to
- //
- while (!done) {
- getRandomPeer() match {
- case Some(peer) =>
- try {
- val onePeerStartTime = System.currentTimeMillis
- logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
- blockTransferService.uploadBlockSync(
- peer.host,
- peer.port,
- peer.executorId,
- blockId,
- new NettyManagedBuffer(data.toNetty),
- tLevel,
- classTag)
- logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
- .format(System.currentTimeMillis - onePeerStartTime))
- peersReplicatedTo += peer
- peersForReplication -= peer
- replicationFailed = false
- if (peersReplicatedTo.size == numPeersToReplicateTo) {
- done = true // specified number of peers have been replicated to
- }
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
- failures += 1
- replicationFailed = true
- peersFailedToReplicateTo += peer
- if (failures > maxReplicationFailures) { // too many failures in replicating to peers
- done = true
- }
+ val numPeersToReplicateTo = level.replication - 1
+
+ val startTime = System.nanoTime
+
+ var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+ var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
+ var numFailures = 0
+
+ var peersForReplication = blockReplicationPolicy.prioritize(
+ blockManagerId,
+ getPeers(false),
+ mutable.HashSet.empty,
+ blockId,
+ numPeersToReplicateTo)
+
+ while(numFailures <= maxReplicationFailures &&
+ !peersForReplication.isEmpty &&
+ peersReplicatedTo.size != numPeersToReplicateTo) {
+ val peer = peersForReplication.head
+ try {
+ val onePeerStartTime = System.nanoTime
+ logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ blockId,
+ new NettyManagedBuffer(data.toNetty),
+ tLevel,
+ classTag)
+ logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
+ s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms")
+ peersForReplication = peersForReplication.tail
+ peersReplicatedTo += peer
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
+ peersFailedToReplicateTo += peer
+ // we have a failed replication, so we get the list of peers again
+ // we don't want peers we have already replicated to and the ones that
+ // have failed previously
+ val filteredPeers = getPeers(true).filter { p =>
+ !peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p)
}
- case None => // no peer left to replicate to
- done = true
+
+ numFailures += 1
+ peersForReplication = blockReplicationPolicy.prioritize(
+ blockManagerId,
+ filteredPeers,
+ peersReplicatedTo,
+ blockId,
+ numPeersToReplicateTo - peersReplicatedTo.size)
}
}
- val timeTakeMs = (System.currentTimeMillis - startTime)
+
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
- s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
+ s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
+
+ logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
}
/**