aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-10-02 13:49:47 -0700
committerReynold Xin <rxin@apache.org>2014-10-02 13:49:47 -0700
commit5db78e6b87d33ac2d48a997e69b46e9be3b63137 (patch)
tree73e39b6f7c2bddb8256a546781d998318289e018 /core/src/main/scala
parentc6469a02f14e8c23e9b4e1336768f8bbfc15f5d8 (diff)
downloadspark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.tar.gz
spark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.tar.bz2
spark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.zip
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver` - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2366 from tdas/replication-fix and squashes the following commits: 9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite. 0661773 [Tathagata Das] Minor changes based on PR comments. a55a65c [Tathagata Das] Added a unit test to test replication behavior. 012afa3 [Tathagata Das] Bug fix 89f91a0 [Tathagata Das] Minor change. 68e2c72 [Tathagata Das] Made replication peer selection logic more efficient. 08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id 3821ab9 [Tathagata Das] Fixes based on PR comments. 08e5646 [Tathagata Das] More minor changes. d402506 [Tathagata Das] Fixed imports. 4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager. 7598f91 [Tathagata Das] Minor changes. 03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition. d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. 9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug. af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala122
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala2
5 files changed, 122 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d1bee3d2c0..3f5d06e1ae 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
@@ -112,6 +113,11 @@ private[spark] class BlockManager(
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+ // Field related to peer block managers that are necessary for block replication
+ @volatile private var cachedPeers: Seq[BlockManagerId] = _
+ private val peerFetchLock = new Object
+ private var lastPeerFetchTime = 0L
+
initialize()
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -787,31 +793,111 @@ private[spark] class BlockManager(
}
/**
- * Replicate block to another node.
+ * Get peer block managers in the system.
+ */
+ private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+ peerFetchLock.synchronized {
+ val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
+ val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
+ if (cachedPeers == null || forceFetch || timeout) {
+ cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+ lastPeerFetchTime = System.currentTimeMillis
+ logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
+ }
+ cachedPeers
+ }
+ }
+
+ /**
+ * Replicate block to another node. Not that this is a blocking call that returns after
+ * the block has been replicated.
*/
- @volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+ val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
+ val numPeersToReplicateTo = level.replication - 1
+ val peersForReplication = new ArrayBuffer[BlockManagerId]
+ val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+ val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
- if (cachedPeers == null) {
- cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+ val startTime = System.currentTimeMillis
+ val random = new Random(blockId.hashCode)
+
+ var replicationFailed = false
+ var failures = 0
+ var done = false
+
+ // Get cached list of peers
+ peersForReplication ++= getPeers(forceFetch = false)
+
+ // Get a random peer. Note that this selection of a peer is deterministic on the block id.
+ // So assuming the list of peers does not change and no replication failures,
+ // if there are multiple attempts in the same node to replicate the same block,
+ // the same set of peers will be selected.
+ def getRandomPeer(): Option[BlockManagerId] = {
+ // If replication had failed, then force update the cached list of peers and remove the peers
+ // that have been already used
+ if (replicationFailed) {
+ peersForReplication.clear()
+ peersForReplication ++= getPeers(forceFetch = true)
+ peersForReplication --= peersReplicatedTo
+ peersForReplication --= peersFailedToReplicateTo
+ }
+ if (!peersForReplication.isEmpty) {
+ Some(peersForReplication(random.nextInt(peersForReplication.size)))
+ } else {
+ None
+ }
}
- for (peer: BlockManagerId <- cachedPeers) {
- val start = System.nanoTime
- data.rewind()
- logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
- s"To node: $peer")
- try {
- blockTransferService.uploadBlockSync(
- peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
- } catch {
- case e: Exception =>
- logError(s"Failed to replicate block to $peer", e)
+ // One by one choose a random peer and try uploading the block to it
+ // If replication fails (e.g., target peer is down), force the list of cached peers
+ // to be re-fetched from driver and then pick another random peer for replication. Also
+ // temporarily black list the peer for which replication failed.
+ //
+ // This selection of a peer and replication is continued in a loop until one of the
+ // following 3 conditions is fulfilled:
+ // (i) specified number of peers have been replicated to
+ // (ii) too many failures in replicating to peers
+ // (iii) no peer left to replicate to
+ //
+ while (!done) {
+ getRandomPeer() match {
+ case Some(peer) =>
+ try {
+ val onePeerStartTime = System.currentTimeMillis
+ data.rewind()
+ logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+ blockTransferService.uploadBlockSync(
+ peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
+ logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
+ .format((System.currentTimeMillis - onePeerStartTime)))
+ peersReplicatedTo += peer
+ peersForReplication -= peer
+ replicationFailed = false
+ if (peersReplicatedTo.size == numPeersToReplicateTo) {
+ done = true // specified number of peers have been replicated to
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
+ failures += 1
+ replicationFailed = true
+ peersFailedToReplicateTo += peer
+ if (failures > maxReplicationFailures) { // too many failures in replcating to peers
+ done = true
+ }
+ }
+ case None => // no peer left to replicate to
+ done = true
}
-
- logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
- .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
+ }
+ val timeTakeMs = (System.currentTimeMillis - startTime)
+ logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
+ s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
+ if (peersReplicatedTo.size < numPeersToReplicateTo) {
+ logWarning(s"Block $blockId replicated to only " +
+ s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index d4487fce49..1422850943 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -59,6 +59,8 @@ class BlockManagerId private (
def port: Int = port_
+ def isDriver: Boolean = (executorId == "<driver>")
+
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 2e262594b3..d08e1419e3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -84,13 +84,8 @@ class BlockManagerMaster(
}
/** Get ids of other nodes in the cluster from the driver */
- def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
- if (result.length != numPeers) {
- throw new SparkException(
- "Error getting peers, only got " + result.size + " instead of " + numPeers)
- }
- result
+ def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+ askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 1a6c7cb24f..6a06257ed0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetLocationsMultipleBlockIds(blockIds) =>
sender ! getLocationsMultipleBlockIds(blockIds)
- case GetPeers(blockManagerId, size) =>
- sender ! getPeers(blockManagerId, size)
+ case GetPeers(blockManagerId) =>
+ sender ! getPeers(blockManagerId)
case GetMemoryStatus =>
sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
- // TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
- removeFromDriver || info.blockManagerId.executorId != "<driver>"
+ removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
@@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
+ if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
@@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
- blockManagerId.executorId == "<driver>" && !isLocal
+ blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.executorId == "<driver>" && !isLocal) {
+ if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockIds.map(blockId => getLocations(blockId))
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
- val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
- val selfIndex = peers.indexOf(blockManagerId)
- if (selfIndex == -1) {
- throw new SparkException("Self index for " + blockManagerId + " not found")
+ /** Get the list of the peers of the given block manager */
+ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+ val blockManagerIds = blockManagerInfo.keySet
+ if (blockManagerIds.contains(blockManagerId)) {
+ blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
+ } else {
+ Seq.empty
}
-
- // Note that this logic will select the same node multiple times if there aren't enough peers
- Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 2ba16b8476..3db5dd9774 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {
case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
- case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+ case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster