aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-31 01:48:16 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-31 01:48:16 -0700
commitba5e544461e8ca9216af703033f6b0de6dbc56ec (patch)
tree0046dd625fa30e6895f6b72aba5ef47b526389b9
parentf6ad3781b1d9a044789f114d13787b9d05223da3 (diff)
downloadspark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.gz
spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.bz2
spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.zip
More block manager cleanup.
Implemented a removeRdd method in BlockManager, and use that to implement RDD.unpersist. Previously, unpersist needs to send B akka messages, where B = number of blocks. Now unpersist only needs to send W akka messages, where W = the number of workers.
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala31
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala49
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala192
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala8
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala10
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala36
8 files changed, 187 insertions, 166 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index dde131696f..e6c0438d76 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,13 +1,10 @@
package spark
-import java.net.URL
-import java.util.{Date, Random}
-import java.util.{HashMap => JHashMap}
+import java.util.Random
import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@@ -32,7 +29,6 @@ import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.ShuffledRDD
-import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.rdd.ZippedPartitionsRDD2
@@ -141,10 +137,15 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
- /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
- def unpersist(): RDD[T] = {
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ * @return This RDD.
+ */
+ def unpersist(blocking: Boolean = true): RDD[T] = {
logInfo("Removing RDD " + id + " from persistence list")
- sc.env.blockManager.master.removeRdd(id)
+ sc.env.blockManager.master.removeRdd(id, blocking)
sc.persistentRdds.remove(id)
storageLevel = StorageLevel.NONE
this
@@ -269,8 +270,8 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- var multiplier = 3.0
- var initialCount = count()
+ val multiplier = 3.0
+ val initialCount = count()
var maxSelected = 0
if (initialCount > Integer.MAX_VALUE - 1) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index d35c43f194..3a5d4ef448 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -3,8 +3,7 @@ package spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
-import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
import akka.actor.{ActorSystem, Cancellable, Props}
import akka.dispatch.{Await, Future}
@@ -15,7 +14,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.{Logging, SizeEstimator, SparkEnv, SparkException, Utils}
+import spark.{Logging, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -95,9 +94,11 @@ private[spark] class BlockManager(
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
- private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
- private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+ private val nettyPort: Int = {
+ val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+ if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+ }
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
@@ -825,9 +826,23 @@ private[spark] class BlockManager(
}
/**
+ * Remove all blocks belonging to the given RDD.
+ * @return The number of blocks removed.
+ */
+ def removeRdd(rddId: Int): Int = {
+ // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps
+ // from RDD.id to blocks.
+ logInfo("Removing RDD " + rddId)
+ val rddPrefix = "rdd_" + rddId + "_"
+ val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1)
+ blocksToRemove.foreach(blockId => removeBlock(blockId, false))
+ blocksToRemove.size
+ }
+
+ /**
* Remove a block from both memory and disk.
*/
- def removeBlock(blockId: String) {
+ def removeBlock(blockId: String, tellMaster: Boolean = true) {
logInfo("Removing block " + blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized {
@@ -839,7 +854,7 @@ private[spark] class BlockManager(
"the disk or memory store")
}
blockInfo.remove(blockId)
- if (info.tellMaster) {
+ if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info)
}
} else {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ac26c16867..7099e40618 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,19 +1,11 @@
package spark.storage
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.dispatch.Await
+import akka.actor.ActorRef
+import akka.dispatch.{Await, Future}
import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import akka.util.Duration
-import spark.{Logging, SparkException, Utils}
+import spark.{Logging, SparkException}
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
@@ -91,15 +83,28 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
/**
* Remove all blocks belonging to the given RDD.
*/
- def removeRdd(rddId: Int) {
- val rddBlockPrefix = "rdd_" + rddId + "_"
- // Get the list of blocks in block manager, and remove ones that are part of this RDD.
- // The runtime complexity is linear to the number of blocks persisted in the cluster.
- // It could be expensive if the cluster is large and has a lot of blocks persisted.
- getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) =>
- if (blockId.startsWith(rddBlockPrefix)) {
- removeBlock(blockId)
- }
+ def removeRdd(rddId: Int, blocking: Boolean) {
+ // The logic to remove an RDD is somewhat complicated:
+ // 1. Send BlockManagerMasterActor a RemoveRdd message.
+ // 2. Upon receiving the RemoveRdd message, BlockManagerMasterActor will forward the message
+ // to all workers to remove blocks belonging to the RDD, and return a Future for the results.
+ // 3. The Future is sent back here, and on successful completion of the Future, this function
+ // sends a RemoveRddMetaData message to BlockManagerMasterActor.
+ // 4. Upon receiving the RemoveRddMetaData message, BlockManagerMasterActor will delete the meta
+ // data for the given RDD.
+ //
+ // The reason we are doing it this way is to reduce the amount of messages the driver sends.
+ // The number of messages that need to be sent is only the number of workers the cluster has,
+ // rather than the number of blocks in the cluster. Note that we can further reduce the number
+ // of messages by tracking for a given RDD, where are its blocks. Then we can send only to the
+ // workers that have the given RDD. But this remains future work.
+ val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
+ future onComplete {
+ case Left(throwable) => logError("Failed to remove RDD " + rddId, throwable)
+ case Right(numBlocks) => tell(RemoveRddMetaData(rddId, numBlocks.sum))
+ }
+ if (blocking) {
+ Await.result(future, timeout)
}
}
@@ -114,7 +119,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
}
def getStorageStatus: Array[StorageStatus] = {
- askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+ askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
}
/** Stop the driver actor, called only on the Spark driver node */
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 9b64f95df8..00aa97bf78 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -2,15 +2,16 @@ package spark.storage
import java.util.{HashMap => JHashMap}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable
import scala.collection.JavaConversions._
-import scala.util.Random
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.util.{Duration, Timeout}
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.util.Duration
import akka.util.duration._
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkException}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -21,13 +22,16 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo =
- new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+ new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
// Mapping from executor ID to block manager ID.
- private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
+ private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
- private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+ private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
+
+ val akkaTimeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
initLogging()
@@ -50,28 +54,38 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
+ sender ! true
case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ // TODO: Ideally we want to handle all the message replies in receive instead of in the
+ // individual private methods.
updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
- getLocations(blockId)
+ sender ! getLocations(blockId)
case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
+ sender ! getLocationsMultipleBlockIds(blockIds)
case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
+ sender ! getPeers(blockManagerId, size)
case GetMemoryStatus =>
- getMemoryStatus
+ sender ! memoryStatus
case GetStorageStatus =>
- getStorageStatus
+ sender ! storageStatus
+
+ case RemoveRdd(rddId) =>
+ sender ! removeRdd(rddId)
+
+ case RemoveRddMetaData(rddId, numBlocks) =>
+ removeRddMetaData(rddId, numBlocks)
+ sender ! true
case RemoveBlock(blockId) =>
- removeBlock(blockId)
+ removeBlockFromWorkers(blockId)
+ sender ! true
case RemoveExecutor(execId) =>
removeExecutor(execId)
@@ -81,7 +95,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel
+ timeoutCheckingTask.cancel()
}
context.stop(self)
@@ -89,13 +103,34 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
expireDeadHosts()
case HeartBeat(blockManagerId) =>
- heartBeat(blockManagerId)
+ sender ! heartBeat(blockManagerId)
case other =>
- logInfo("Got unknown message: " + other)
+ logWarning("Got unknown message: " + other)
+ }
+
+ private def removeRdd(rddId: Int): Future[Seq[Int]] = {
+ // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
+ // The dispatcher is used as an implicit argument into the Future sequence construction.
+ import context.dispatcher
+ Future.sequence(blockManagerInfo.values.map { bm =>
+ bm.slaveActor.ask(RemoveRdd(rddId))(akkaTimeout).mapTo[Int]
+ }.toSeq)
+ }
+
+ private def removeRddMetaData(rddId: Int, numBlocks: Int) {
+ val prefix = "rdd_" + rddId + "_"
+ // Find all blocks for the given RDD, remove the block from both blockLocations and
+ // the blockManagerInfo that is tracking the blocks.
+ val blocks = blockLocations.keySet().filter(_.startsWith(prefix))
+ blocks.foreach { blockId =>
+ val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
+ bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+ blockLocations.remove(blockId)
+ }
}
- def removeBlockManager(blockManagerId: BlockManagerId) {
+ private def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
// Remove the block manager from blockManagerIdByExecutor.
@@ -106,7 +141,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
- val locations = blockLocations.get(blockId)._2
+ val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(locations)
@@ -114,11 +149,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
- def expireDeadHosts() {
+ private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
- val toRemove = new HashSet[BlockManagerId]
+ val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
@@ -129,31 +164,26 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
toRemove.foreach(removeBlockManager)
}
- def removeExecutor(execId: String) {
+ private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
- sender ! true
}
- def heartBeat(blockManagerId: BlockManagerId) {
+ private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.executorId == "<driver>" && !isLocal) {
- sender ! true
- } else {
- sender ! false
- }
+ blockManagerId.executorId == "<driver>" && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
+ true
}
}
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
- private def removeBlock(blockId: String) {
- val block = blockLocations.get(blockId)
- if (block != null) {
- block._2.foreach { blockManagerId: BlockManagerId =>
+ private def removeBlockFromWorkers(blockId: String) {
+ val locations = blockLocations.get(blockId)
+ if (locations != null) {
+ locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
@@ -163,23 +193,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
}
- sender ! true
}
// Return a map from the block manager id to max memory and remaining memory.
- private def getMemoryStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
- sender ! res
}
- private def getStorageStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ private def storageStatus: Array[StorageStatus] = {
+ blockManagerInfo.map { case(blockManagerId, info) =>
import collection.JavaConverters._
StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
- }
- sender ! res
+ }.toArray
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
@@ -188,7 +215,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
} else if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
- // A block manager of the same host name already exists
+ // A block manager of the same executor already exists.
+ // This should never happen. Let's just quit.
logError("Got two different block manager registrations on " + id.executorId)
System.exit(1)
case None =>
@@ -197,7 +225,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
- sender ! true
}
private def updateBlockInfo(
@@ -226,12 +253,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
- var locations: HashSet[BlockManagerId] = null
+ var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
- locations = blockLocations.get(blockId)._2
+ locations = blockLocations.get(blockId)
} else {
- locations = new HashSet[BlockManagerId]
- blockLocations.put(blockId, (storageLevel.replication, locations))
+ locations = new mutable.HashSet[BlockManagerId]
+ blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
@@ -247,70 +274,24 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! true
}
- private def getLocations(blockId: String) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockId + " "
- if (blockLocations.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockLocations.get(blockId)._2)
- sender ! res.toSeq
- } else {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- sender ! res
- }
- }
-
- private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- val tmp = blockId
- if (blockLocations.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockLocations.get(blockId)._2)
- return res.toSeq
- } else {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- return res.toSeq
- }
- }
-
- var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
- for (blockId <- blockIds) {
- res.append(getLocations(blockId))
- }
- sender ! res.toSeq
+ private def getLocations(blockId: String): Seq[BlockManagerId] = {
+ if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(peers)
- res -= blockManagerId
- val rand = new Random(System.currentTimeMillis())
- while (res.length > size) {
- res.remove(rand.nextInt(res.length))
- }
- sender ! res.toSeq
+ private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ blockIds.map(blockId => getLocations(blockId))
}
- private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
+ val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
val selfIndex = peers.indexOf(blockManagerId)
if (selfIndex == -1) {
- throw new Exception("Self index for " + blockManagerId + " not found")
+ throw new SparkException("Self index for " + blockManagerId + " not found")
}
// Note that this logic will select the same node multiple times if there aren't enough peers
- var index = selfIndex
- while (res.size < size) {
- index += 1
- if (index == selfIndex) {
- throw new Exception("More peer expected than available")
- }
- res += peers(index % peers.size)
- }
- sender ! res.toSeq
+ Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}
@@ -384,6 +365,13 @@ object BlockManagerMasterActor {
}
}
+ def removeBlock(blockId: String) {
+ if (_blocks.containsKey(blockId)) {
+ _remainingMem += _blocks.get(blockId).memSize
+ _blocks.remove(blockId)
+ }
+ }
+
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index cff48d9909..88268fd41b 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -16,6 +16,12 @@ sealed trait ToBlockManagerSlave
private[spark]
case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+// Remove all blocks belonging to a specific RDD.
+private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
+// Remove the meta data for a RDD. This is only sent to the master by the master.
+private[spark] case class RemoveRddMetaData(rddId: Int, numBlocks: Int) extends ToBlockManagerMaster
+
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index f570cdc52d..b264d1deb5 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -11,6 +11,12 @@ import spark.{Logging, SparkException, Utils}
*/
class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
override def receive = {
- case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+
+ case RemoveBlock(blockId) =>
+ blockManager.removeBlock(blockId)
+
+ case RemoveRdd(rddId) =>
+ val numBlocksRemoved = blockManager.removeRdd(rddId)
+ sender ! numBlocksRemoved
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 15225f93a6..3057ade233 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -2,13 +2,7 @@ package spark.storage
import java.nio.ByteBuffer
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import spark.{Logging, Utils, SparkEnv}
+import spark.{Logging, Utils}
import spark.network._
/**
@@ -88,8 +82,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
- private val DATA_TRANSFER_TIME_OUT_MS: Long = 500
- private val REQUEST_RETRY_INTERVAL_MS: Long = 1000
initLogging()
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index bff2475686..b9d5f9668e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -15,10 +15,10 @@ import org.scalatest.time.SpanSugar._
import spark.JavaSerializer
import spark.KryoSerializer
import spark.SizeEstimator
-import spark.Utils
import spark.util.AkkaUtils
import spark.util.ByteBufferInputStream
+
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
var store2: BlockManager = null
@@ -124,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
@@ -170,7 +170,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@@ -218,7 +218,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
- master.removeRdd(0)
+ master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("rdd_0_0") should be (None)
@@ -232,6 +232,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.getSingle("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
+
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0, blocking = true)
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
}
test("reregistration on heart beat") {
@@ -262,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -280,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
@@ -490,9 +498,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -501,7 +509,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -516,9 +524,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -533,7 +541,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)