aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-10-02 13:49:47 -0700
committerReynold Xin <rxin@apache.org>2014-10-02 13:49:47 -0700
commit5db78e6b87d33ac2d48a997e69b46e9be3b63137 (patch)
tree73e39b6f7c2bddb8256a546781d998318289e018 /core
parentc6469a02f14e8c23e9b4e1336768f8bbfc15f5d8 (diff)
downloadspark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.tar.gz
spark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.tar.bz2
spark-5db78e6b87d33ac2d48a997e69b46e9be3b63137.zip
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver` - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2366 from tdas/replication-fix and squashes the following commits: 9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite. 0661773 [Tathagata Das] Minor changes based on PR comments. a55a65c [Tathagata Das] Added a unit test to test replication behavior. 012afa3 [Tathagata Das] Bug fix 89f91a0 [Tathagata Das] Minor change. 68e2c72 [Tathagata Das] Made replication peer selection logic more efficient. 08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id 3821ab9 [Tathagata Das] Fixes based on PR comments. 08e5646 [Tathagata Das] More minor changes. d402506 [Tathagata Das] Fixed imports. 4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager. 7598f91 [Tathagata Das] Minor changes. 03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition. d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. 9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug. af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala122
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala418
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala9
8 files changed, 544 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d1bee3d2c0..3f5d06e1ae 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
@@ -112,6 +113,11 @@ private[spark] class BlockManager(
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+ // Field related to peer block managers that are necessary for block replication
+ @volatile private var cachedPeers: Seq[BlockManagerId] = _
+ private val peerFetchLock = new Object
+ private var lastPeerFetchTime = 0L
+
initialize()
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -787,31 +793,111 @@ private[spark] class BlockManager(
}
/**
- * Replicate block to another node.
+ * Get peer block managers in the system.
+ */
+ private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+ peerFetchLock.synchronized {
+ val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
+ val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
+ if (cachedPeers == null || forceFetch || timeout) {
+ cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+ lastPeerFetchTime = System.currentTimeMillis
+ logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
+ }
+ cachedPeers
+ }
+ }
+
+ /**
+ * Replicate block to another node. Not that this is a blocking call that returns after
+ * the block has been replicated.
*/
- @volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+ val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
+ val numPeersToReplicateTo = level.replication - 1
+ val peersForReplication = new ArrayBuffer[BlockManagerId]
+ val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+ val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
- if (cachedPeers == null) {
- cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+ val startTime = System.currentTimeMillis
+ val random = new Random(blockId.hashCode)
+
+ var replicationFailed = false
+ var failures = 0
+ var done = false
+
+ // Get cached list of peers
+ peersForReplication ++= getPeers(forceFetch = false)
+
+ // Get a random peer. Note that this selection of a peer is deterministic on the block id.
+ // So assuming the list of peers does not change and no replication failures,
+ // if there are multiple attempts in the same node to replicate the same block,
+ // the same set of peers will be selected.
+ def getRandomPeer(): Option[BlockManagerId] = {
+ // If replication had failed, then force update the cached list of peers and remove the peers
+ // that have been already used
+ if (replicationFailed) {
+ peersForReplication.clear()
+ peersForReplication ++= getPeers(forceFetch = true)
+ peersForReplication --= peersReplicatedTo
+ peersForReplication --= peersFailedToReplicateTo
+ }
+ if (!peersForReplication.isEmpty) {
+ Some(peersForReplication(random.nextInt(peersForReplication.size)))
+ } else {
+ None
+ }
}
- for (peer: BlockManagerId <- cachedPeers) {
- val start = System.nanoTime
- data.rewind()
- logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
- s"To node: $peer")
- try {
- blockTransferService.uploadBlockSync(
- peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
- } catch {
- case e: Exception =>
- logError(s"Failed to replicate block to $peer", e)
+ // One by one choose a random peer and try uploading the block to it
+ // If replication fails (e.g., target peer is down), force the list of cached peers
+ // to be re-fetched from driver and then pick another random peer for replication. Also
+ // temporarily black list the peer for which replication failed.
+ //
+ // This selection of a peer and replication is continued in a loop until one of the
+ // following 3 conditions is fulfilled:
+ // (i) specified number of peers have been replicated to
+ // (ii) too many failures in replicating to peers
+ // (iii) no peer left to replicate to
+ //
+ while (!done) {
+ getRandomPeer() match {
+ case Some(peer) =>
+ try {
+ val onePeerStartTime = System.currentTimeMillis
+ data.rewind()
+ logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+ blockTransferService.uploadBlockSync(
+ peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
+ logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
+ .format((System.currentTimeMillis - onePeerStartTime)))
+ peersReplicatedTo += peer
+ peersForReplication -= peer
+ replicationFailed = false
+ if (peersReplicatedTo.size == numPeersToReplicateTo) {
+ done = true // specified number of peers have been replicated to
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
+ failures += 1
+ replicationFailed = true
+ peersFailedToReplicateTo += peer
+ if (failures > maxReplicationFailures) { // too many failures in replcating to peers
+ done = true
+ }
+ }
+ case None => // no peer left to replicate to
+ done = true
}
-
- logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
- .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
+ }
+ val timeTakeMs = (System.currentTimeMillis - startTime)
+ logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
+ s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
+ if (peersReplicatedTo.size < numPeersToReplicateTo) {
+ logWarning(s"Block $blockId replicated to only " +
+ s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index d4487fce49..1422850943 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -59,6 +59,8 @@ class BlockManagerId private (
def port: Int = port_
+ def isDriver: Boolean = (executorId == "<driver>")
+
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 2e262594b3..d08e1419e3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -84,13 +84,8 @@ class BlockManagerMaster(
}
/** Get ids of other nodes in the cluster from the driver */
- def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
- if (result.length != numPeers) {
- throw new SparkException(
- "Error getting peers, only got " + result.size + " instead of " + numPeers)
- }
- result
+ def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+ askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 1a6c7cb24f..6a06257ed0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetLocationsMultipleBlockIds(blockIds) =>
sender ! getLocationsMultipleBlockIds(blockIds)
- case GetPeers(blockManagerId, size) =>
- sender ! getPeers(blockManagerId, size)
+ case GetPeers(blockManagerId) =>
+ sender ! getPeers(blockManagerId)
case GetMemoryStatus =>
sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
- // TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
- removeFromDriver || info.blockManagerId.executorId != "<driver>"
+ removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
@@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
+ if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
@@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
- blockManagerId.executorId == "<driver>" && !isLocal
+ blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.executorId == "<driver>" && !isLocal) {
+ if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockIds.map(blockId => getLocations(blockId))
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
- val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
- val selfIndex = peers.indexOf(blockManagerId)
- if (selfIndex == -1) {
- throw new SparkException("Self index for " + blockManagerId + " not found")
+ /** Get the list of the peers of the given block manager */
+ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+ val blockManagerIds = blockManagerInfo.keySet
+ if (blockManagerIds.contains(blockManagerId)) {
+ blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
+ } else {
+ Seq.empty
}
-
- // Note that this logic will select the same node multiple times if there aren't enough peers
- Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 2ba16b8476..3db5dd9774 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {
case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
- case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+ case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 978a6ded80..acaf321de5 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
- assert(bm.executorId === "<driver>", "Block should only be on the driver")
+ assert(bm.isDriver, "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
new file mode 100644
index 0000000000..1f1d53a1ee
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
+import akka.actor.{ActorSystem, Props}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage.StorageLevel._
+import org.apache.spark.util.{AkkaUtils, SizeEstimator}
+
+/** Testsuite that tests block replication in BlockManager */
+class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAndAfter {
+
+ private val conf = new SparkConf(false)
+ var actorSystem: ActorSystem = null
+ var master: BlockManagerMaster = null
+ val securityMgr = new SecurityManager(conf)
+ val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val shuffleManager = new HashShuffleManager(conf)
+
+ // List of block manager created during an unit test, so that all of the them can be stopped
+ // after the unit test.
+ val allStores = new ArrayBuffer[BlockManager]
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ conf.set("spark.kryoserializer.buffer.mb", "1")
+ val serializer = new KryoSerializer(conf)
+
+ // Implicitly convert strings to BlockIds for test clarity.
+ implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+
+ private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ val transfer = new NioBlockTransferService(conf, securityMgr)
+ val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
+ mapOutputTracker, shuffleManager, transfer)
+ allStores += store
+ store
+ }
+
+ before {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+ this.actorSystem = actorSystem
+
+ conf.set("spark.authenticate", "false")
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.storage.unrollFraction", "0.4")
+ conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+ // to make a replication attempt to inactive store fail fast
+ conf.set("spark.core.connection.ack.wait.timeout", "1")
+ // to make cached peers refresh frequently
+ conf.set("spark.storage.cachedPeersTtl", "10")
+
+ master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf, true)
+ allStores.clear()
+ }
+
+ after {
+ allStores.foreach { _.stop() }
+ allStores.clear()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ actorSystem = null
+ master = null
+ }
+
+
+ test("get peers with addition and removal of block managers") {
+ val numStores = 4
+ val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") }
+ val storeIds = stores.map { _.blockManagerId }.toSet
+ assert(master.getPeers(stores(0).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(0).blockManagerId })
+ assert(master.getPeers(stores(1).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(1).blockManagerId })
+ assert(master.getPeers(stores(2).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(2).blockManagerId })
+
+ // Add driver store and test whether it is filtered out
+ val driverStore = makeBlockManager(1000, "<driver>")
+ assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+ assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+ assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+ // Add a new store and test whether get peers returns it
+ val newStore = makeBlockManager(1000, s"store$numStores")
+ assert(master.getPeers(stores(0).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId)
+ assert(master.getPeers(stores(1).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId)
+ assert(master.getPeers(stores(2).blockManagerId).toSet ===
+ storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId)
+ assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+ // Remove a store and test whether get peers returns it
+ val storeIdToRemove = stores(0).blockManagerId
+ master.removeExecutor(storeIdToRemove.executorId)
+ assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+ assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+ assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+ // Test whether asking for peers of a unregistered block manager id returns empty list
+ assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+ assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+ }
+
+
+ test("block replication - 2x replication") {
+ testReplication(2,
+ Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2)
+ )
+ }
+
+ test("block replication - 3x replication") {
+ // Generate storage levels with 3x replication
+ val storageLevels = {
+ Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
+ level => StorageLevel(
+ level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3)
+ }
+ }
+ testReplication(3, storageLevels)
+ }
+
+ test("block replication - mixed between 1x to 5x") {
+ // Generate storage levels with varying replication
+ val storageLevels = Seq(
+ MEMORY_ONLY,
+ MEMORY_ONLY_SER_2,
+ StorageLevel(true, false, false, false, 3),
+ StorageLevel(true, true, false, true, 4),
+ StorageLevel(true, true, false, false, 5),
+ StorageLevel(true, true, false, true, 4),
+ StorageLevel(true, false, false, false, 3),
+ MEMORY_ONLY_SER_2,
+ MEMORY_ONLY
+ )
+ testReplication(5, storageLevels)
+ }
+
+ test("block replication - 2x replication without peers") {
+ intercept[org.scalatest.exceptions.TestFailedException] {
+ testReplication(1,
+ Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3)))
+ }
+ }
+
+ test("block replication - deterministic node selection") {
+ val blockSize = 1000
+ val storeSize = 10000
+ val stores = (1 to 5).map {
+ i => makeBlockManager(storeSize, s"store$i")
+ }
+ val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2
+ val storageLevel3x = StorageLevel(true, true, false, true, 3)
+ val storageLevel4x = StorageLevel(true, true, false, true, 4)
+
+ def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
+ stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+ val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
+ stores.foreach { _.removeBlock(blockId) }
+ master.removeBlock(blockId)
+ locations
+ }
+
+ // Test if two attempts to 2x replication returns same set of locations
+ val a1Locs = putBlockAndGetLocations("a1", storageLevel2x)
+ assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs,
+ "Inserting a 2x replicated block second time gave different locations from the first")
+
+ // Test if two attempts to 3x replication returns same set of locations
+ val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x)
+ assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x,
+ "Inserting a 3x replicated block second time gave different locations from the first")
+
+ // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication
+ val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x)
+ assert(
+ a2Locs2x.subsetOf(a2Locs3x),
+ "Inserting a with 2x replication gave locations that are not a subset of locations" +
+ s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}"
+ )
+
+ // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication
+ val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x)
+ assert(
+ a2Locs3x.subsetOf(a2Locs4x),
+ "Inserting a with 4x replication gave locations that are not a superset of locations " +
+ s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}"
+ )
+
+ // Test if 3x replication of two different blocks gives two different sets of locations
+ val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x)
+ assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication")
+ }
+
+ test("block replication - replication failures") {
+ /*
+ Create a system of three block managers / stores. One of them (say, failableStore)
+ cannot receive blocks. So attempts to use that as replication target fails.
+
+ +-----------/fails/-----------> failableStore
+ |
+ normalStore
+ |
+ +-----------/works/-----------> anotherNormalStore
+
+ We are first going to add a normal block manager (i.e. normalStore) and the failable block
+ manager (i.e. failableStore), and test whether 2x replication fails to create two
+ copies of a block. Then we are going to add another normal block manager
+ (i.e., anotherNormalStore), and test that now 2x replication works as the
+ new store will be used for replication.
+ */
+
+ // Add a normal block manager
+ val store = makeBlockManager(10000, "store")
+
+ // Insert a block with 2x replication and return the number of copies of the block
+ def replicateAndGetNumCopies(blockId: String): Int = {
+ store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+ val numLocations = master.getLocations(blockId).size
+ allStores.foreach { _.removeBlock(blockId) }
+ numLocations
+ }
+
+ // Add a failable block manager with a mock transfer service that does not
+ // allow receiving of blocks. So attempts to use it as a replication target will fail.
+ val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
+ when(failableTransfer.hostName).thenReturn("some-hostname")
+ when(failableTransfer.port).thenReturn(1000)
+ val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
+ 10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
+ allStores += failableStore // so that this gets stopped after test
+ assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
+
+ // Test that 2x replication fails by creating only one copy of the block
+ assert(replicateAndGetNumCopies("a1") === 1)
+
+ // Add another normal block manager and test that 2x replication works
+ makeBlockManager(10000, "anotherStore")
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ assert(replicateAndGetNumCopies("a2") === 2)
+ }
+ }
+
+ test("block replication - addition and deletion of block managers") {
+ val blockSize = 1000
+ val storeSize = 10000
+ val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
+
+ // Insert a block with given replication factor and return the number of copies of the block\
+ def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
+ val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+ initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+ val numLocations = master.getLocations(blockId).size
+ allStores.foreach { _.removeBlock(blockId) }
+ numLocations
+ }
+
+ // 2x replication should work, 3x replication should only replicate 2x
+ assert(replicateAndGetNumCopies("a1", 2) === 2)
+ assert(replicateAndGetNumCopies("a2", 3) === 2)
+
+ // Add another store, 3x replication should work now, 4x replication should only replicate 3x
+ val newStore1 = makeBlockManager(storeSize, s"newstore1")
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ assert(replicateAndGetNumCopies("a3", 3) === 3)
+ }
+ assert(replicateAndGetNumCopies("a4", 4) === 3)
+
+ // Add another store, 4x replication should work now
+ val newStore2 = makeBlockManager(storeSize, s"newstore2")
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ assert(replicateAndGetNumCopies("a5", 4) === 4)
+ }
+
+ // Remove all but the 1st store, 2x replication should fail
+ (initialStores.tail ++ Seq(newStore1, newStore2)).foreach {
+ store =>
+ master.removeExecutor(store.blockManagerId.executorId)
+ store.stop()
+ }
+ assert(replicateAndGetNumCopies("a6", 2) === 1)
+
+ // Add new stores, 3x replication should work
+ val newStores = (3 to 5).map {
+ i => makeBlockManager(storeSize, s"newstore$i")
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ assert(replicateAndGetNumCopies("a7", 3) === 3)
+ }
+ }
+
+ /**
+ * Test replication of blocks with different storage levels (various combinations of
+ * memory, disk & serialization). For each storage level, this function tests every store
+ * whether the block is present and also tests the master whether its knowledge of blocks
+ * is correct. Then it also drops the block from memory of each store (using LRU) and
+ * again checks whether the master's knowledge gets updated.
+ */
+ private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+ import org.apache.spark.storage.StorageLevel._
+
+ assert(maxReplication > 1,
+ s"Cannot test replication factor $maxReplication")
+
+ // storage levels to test with the given replication factor
+
+ val storeSize = 10000
+ val blockSize = 1000
+
+ // As many stores as the replication factor
+ val stores = (1 to maxReplication).map {
+ i => makeBlockManager(storeSize, s"store$i")
+ }
+
+ storageLevels.foreach { storageLevel =>
+ // Put the block into one of the stores
+ val blockId = new TestBlockId(
+ "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
+ stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+ // Assert that master know two locations for the block
+ val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
+ assert(blockLocations.size === storageLevel.replication,
+ s"master did not have ${storageLevel.replication} locations for $blockId")
+
+ // Test state of the stores that contain the block
+ stores.filter {
+ testStore => blockLocations.contains(testStore.blockManagerId.executorId)
+ }.foreach { testStore =>
+ val testStoreName = testStore.blockManagerId.executorId
+ assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+ assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
+ s"master does not have status for ${blockId.name} in $testStoreName")
+
+ val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
+
+ // Assert that block status in the master for this store has expected storage level
+ assert(
+ blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
+ blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
+ blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
+ blockStatus.storageLevel.deserialized === storageLevel.deserialized,
+ s"master does not know correct storage level for ${blockId.name} in $testStoreName")
+
+ // Assert that the block status in the master for this store has correct memory usage info
+ assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
+ s"master does not know size of ${blockId.name} stored in memory of $testStoreName")
+
+
+ // If the block is supposed to be in memory, then drop the copy of the block in
+ // this store test whether master is updated with zero memory usage this store
+ if (storageLevel.useMemory) {
+ // Force the block to be dropped by adding a number of dummy blocks
+ (1 to 10).foreach {
+ i =>
+ testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+ }
+ (1 to 10).foreach {
+ i => testStore.removeBlock(s"dummy-block-$i")
+ }
+
+ val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)
+
+ // Assert that the block status in the master either does not exist (block removed
+ // from every store) or has zero memory usage for this store
+ assert(
+ newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
+ s"after dropping, master does not know size of ${blockId.name} " +
+ s"stored in memory of $testStoreName"
+ )
+ }
+
+ // If the block is supposed to be in disk (after dropping or otherwise, then
+ // test whether master has correct disk usage for this store
+ if (storageLevel.useDisk) {
+ assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
+ s"after dropping, master does not know size of ${blockId.name} " +
+ s"stored in disk of $testStoreName"
+ )
+ }
+ }
+ master.removeBlock(blockId)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e251660dae..9d96202a3e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -21,8 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays
import java.util.concurrent.TimeUnit
-import org.apache.spark.network.nio.NioBlockTransferService
-
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -35,13 +33,13 @@ import akka.util.Timeout
import org.mockito.Mockito.{mock, when}
-import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
-import org.scalatest.Matchers
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -189,7 +187,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store = makeBlockManager(2000, "exec1")
store2 = makeBlockManager(2000, "exec2")
- val peers = master.getPeers(store.blockManagerId, 1)
+ val peers = master.getPeers(store.blockManagerId)
assert(peers.size === 1, "master did not return the other manager as a peer")
assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
@@ -448,7 +446,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
- System.out.println(list2DiskGet)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)