diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-31 01:48:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-31 01:48:16 -0700 |
commit | ba5e544461e8ca9216af703033f6b0de6dbc56ec (patch) | |
tree | 0046dd625fa30e6895f6b72aba5ef47b526389b9 | |
parent | f6ad3781b1d9a044789f114d13787b9d05223da3 (diff) | |
download | spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.gz spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.bz2 spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.zip |
More block manager cleanup.
Implemented a removeRdd method in BlockManager, and use that to
implement RDD.unpersist. Previously, unpersist needs to send B akka
messages, where B = number of blocks. Now unpersist only needs to send W
akka messages, where W = the number of workers.
8 files changed, 187 insertions, 166 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index dde131696f..e6c0438d76 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -1,13 +1,10 @@ package spark -import java.net.URL -import java.util.{Date, Random} -import java.util.{HashMap => JHashMap} +import java.util.Random import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -32,7 +29,6 @@ import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD import spark.rdd.ShuffledRDD -import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD import spark.rdd.ZippedPartitionsRDD2 @@ -141,10 +137,15 @@ abstract class RDD[T: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() - /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ - def unpersist(): RDD[T] = { + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + * + * @param blocking Whether to block until all blocks are deleted. + * @return This RDD. + */ + def unpersist(blocking: Boolean = true): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - sc.env.blockManager.master.removeRdd(id) + sc.env.blockManager.master.removeRdd(id, blocking) sc.persistentRdds.remove(id) storageLevel = StorageLevel.NONE this @@ -269,8 +270,8 @@ abstract class RDD[T: ClassManifest]( def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 - var multiplier = 3.0 - var initialCount = count() + val multiplier = 3.0 + val initialCount = count() var maxSelected = 0 if (initialCount > Integer.MAX_VALUE - 1) { diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index d35c43f194..3a5d4ef448 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -3,8 +3,7 @@ package spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue} -import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet} import akka.actor.{ActorSystem, Cancellable, Props} import akka.dispatch.{Await, Future} @@ -15,7 +14,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import spark.{Logging, SizeEstimator, SparkEnv, SparkException, Utils} +import spark.{Logging, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap} @@ -95,9 +94,11 @@ private[spark] class BlockManager( new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) // If we use Netty for shuffle, start a new Netty-based shuffle sender service. - private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean - private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt - private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + private val nettyPort: Int = { + val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean + val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt + if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + } val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext @@ -825,9 +826,23 @@ private[spark] class BlockManager( } /** + * Remove all blocks belonging to the given RDD. + * @return The number of blocks removed. + */ + def removeRdd(rddId: Int): Int = { + // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps + // from RDD.id to blocks. + logInfo("Removing RDD " + rddId) + val rddPrefix = "rdd_" + rddId + "_" + val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1) + blocksToRemove.foreach(blockId => removeBlock(blockId, false)) + blocksToRemove.size + } + + /** * Remove a block from both memory and disk. */ - def removeBlock(blockId: String) { + def removeBlock(blockId: String, tellMaster: Boolean = true) { logInfo("Removing block " + blockId) val info = blockInfo.get(blockId).orNull if (info != null) info.synchronized { @@ -839,7 +854,7 @@ private[spark] class BlockManager( "the disk or memory store") } blockInfo.remove(blockId) - if (info.tellMaster) { + if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, info) } } else { diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index ac26c16867..7099e40618 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -1,19 +1,11 @@ package spark.storage -import java.io._ -import java.util.{HashMap => JHashMap} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.util.Random - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.dispatch.Await +import akka.actor.ActorRef +import akka.dispatch.{Await, Future} import akka.pattern.ask -import akka.util.{Duration, Timeout} -import akka.util.duration._ +import akka.util.Duration -import spark.{Logging, SparkException, Utils} +import spark.{Logging, SparkException} private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { @@ -91,15 +83,28 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi /** * Remove all blocks belonging to the given RDD. */ - def removeRdd(rddId: Int) { - val rddBlockPrefix = "rdd_" + rddId + "_" - // Get the list of blocks in block manager, and remove ones that are part of this RDD. - // The runtime complexity is linear to the number of blocks persisted in the cluster. - // It could be expensive if the cluster is large and has a lot of blocks persisted. - getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) => - if (blockId.startsWith(rddBlockPrefix)) { - removeBlock(blockId) - } + def removeRdd(rddId: Int, blocking: Boolean) { + // The logic to remove an RDD is somewhat complicated: + // 1. Send BlockManagerMasterActor a RemoveRdd message. + // 2. Upon receiving the RemoveRdd message, BlockManagerMasterActor will forward the message + // to all workers to remove blocks belonging to the RDD, and return a Future for the results. + // 3. The Future is sent back here, and on successful completion of the Future, this function + // sends a RemoveRddMetaData message to BlockManagerMasterActor. + // 4. Upon receiving the RemoveRddMetaData message, BlockManagerMasterActor will delete the meta + // data for the given RDD. + // + // The reason we are doing it this way is to reduce the amount of messages the driver sends. + // The number of messages that need to be sent is only the number of workers the cluster has, + // rather than the number of blocks in the cluster. Note that we can further reduce the number + // of messages by tracking for a given RDD, where are its blocks. Then we can send only to the + // workers that have the given RDD. But this remains future work. + val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) + future onComplete { + case Left(throwable) => logError("Failed to remove RDD " + rddId, throwable) + case Right(numBlocks) => tell(RemoveRddMetaData(rddId, numBlocks.sum)) + } + if (blocking) { + Await.result(future, timeout) } } @@ -114,7 +119,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi } def getStorageStatus: Array[StorageStatus] = { - askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + askDriverWithReply[Array[StorageStatus]](GetStorageStatus) } /** Stop the driver actor, called only on the Spark driver node */ diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 9b64f95df8..00aa97bf78 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -2,15 +2,16 @@ package spark.storage import java.util.{HashMap => JHashMap} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable import scala.collection.JavaConversions._ -import scala.util.Random import akka.actor.{Actor, ActorRef, Cancellable} -import akka.util.{Duration, Timeout} +import akka.dispatch.Future +import akka.pattern.ask +import akka.util.Duration import akka.util.duration._ -import spark.{Logging, Utils} +import spark.{Logging, Utils, SparkException} /** * BlockManagerMasterActor is an actor on the master node to track statuses of @@ -21,13 +22,16 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // Mapping from block manager id to the block manager's information. private val blockManagerInfo = - new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] + new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] // Mapping from executor ID to block manager ID. - private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId] + private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. - private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] + private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]] + + val akkaTimeout = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") initLogging() @@ -50,28 +54,38 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) + sender ! true case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + // TODO: Ideally we want to handle all the message replies in receive instead of in the + // individual private methods. updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => - getLocations(blockId) + sender ! getLocations(blockId) case GetLocationsMultipleBlockIds(blockIds) => - getLocationsMultipleBlockIds(blockIds) + sender ! getLocationsMultipleBlockIds(blockIds) case GetPeers(blockManagerId, size) => - getPeersDeterministic(blockManagerId, size) - /*getPeers(blockManagerId, size)*/ + sender ! getPeers(blockManagerId, size) case GetMemoryStatus => - getMemoryStatus + sender ! memoryStatus case GetStorageStatus => - getStorageStatus + sender ! storageStatus + + case RemoveRdd(rddId) => + sender ! removeRdd(rddId) + + case RemoveRddMetaData(rddId, numBlocks) => + removeRddMetaData(rddId, numBlocks) + sender ! true case RemoveBlock(blockId) => - removeBlock(blockId) + removeBlockFromWorkers(blockId) + sender ! true case RemoveExecutor(execId) => removeExecutor(execId) @@ -81,7 +95,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { logInfo("Stopping BlockManagerMaster") sender ! true if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel + timeoutCheckingTask.cancel() } context.stop(self) @@ -89,13 +103,34 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { expireDeadHosts() case HeartBeat(blockManagerId) => - heartBeat(blockManagerId) + sender ! heartBeat(blockManagerId) case other => - logInfo("Got unknown message: " + other) + logWarning("Got unknown message: " + other) + } + + private def removeRdd(rddId: Int): Future[Seq[Int]] = { + // Ask the slaves to remove the RDD, and put the result in a sequence of Futures. + // The dispatcher is used as an implicit argument into the Future sequence construction. + import context.dispatcher + Future.sequence(blockManagerInfo.values.map { bm => + bm.slaveActor.ask(RemoveRdd(rddId))(akkaTimeout).mapTo[Int] + }.toSeq) + } + + private def removeRddMetaData(rddId: Int, numBlocks: Int) { + val prefix = "rdd_" + rddId + "_" + // Find all blocks for the given RDD, remove the block from both blockLocations and + // the blockManagerInfo that is tracking the blocks. + val blocks = blockLocations.keySet().filter(_.startsWith(prefix)) + blocks.foreach { blockId => + val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId) + bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId))) + blockLocations.remove(blockId) + } } - def removeBlockManager(blockManagerId: BlockManagerId) { + private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) // Remove the block manager from blockManagerIdByExecutor. @@ -106,7 +141,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next - val locations = blockLocations.get(blockId)._2 + val locations = blockLocations.get(blockId) locations -= blockManagerId if (locations.size == 0) { blockLocations.remove(locations) @@ -114,11 +149,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } } - def expireDeadHosts() { + private def expireDeadHosts() { logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.") val now = System.currentTimeMillis() val minSeenTime = now - slaveTimeout - val toRemove = new HashSet[BlockManagerId] + val toRemove = new mutable.HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + @@ -129,31 +164,26 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { toRemove.foreach(removeBlockManager) } - def removeExecutor(execId: String) { + private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) - sender ! true } - def heartBeat(blockManagerId: BlockManagerId) { + private def heartBeat(blockManagerId: BlockManagerId): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.executorId == "<driver>" && !isLocal) { - sender ! true - } else { - sender ! false - } + blockManagerId.executorId == "<driver>" && !isLocal } else { blockManagerInfo(blockManagerId).updateLastSeenMs() - sender ! true + true } } // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. - private def removeBlock(blockId: String) { - val block = blockLocations.get(blockId) - if (block != null) { - block._2.foreach { blockManagerId: BlockManagerId => + private def removeBlockFromWorkers(blockId: String) { + val locations = blockLocations.get(blockId) + if (locations != null) { + locations.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) if (blockManager.isDefined) { // Remove the block from the slave's BlockManager. @@ -163,23 +193,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } } } - sender ! true } // Return a map from the block manager id to max memory and remaining memory. - private def getMemoryStatus() { - val res = blockManagerInfo.map { case(blockManagerId, info) => + private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { + blockManagerInfo.map { case(blockManagerId, info) => (blockManagerId, (info.maxMem, info.remainingMem)) }.toMap - sender ! res } - private def getStorageStatus() { - val res = blockManagerInfo.map { case(blockManagerId, info) => + private def storageStatus: Array[StorageStatus] = { + blockManagerInfo.map { case(blockManagerId, info) => import collection.JavaConverters._ StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) - } - sender ! res + }.toArray } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { @@ -188,7 +215,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } else if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => - // A block manager of the same host name already exists + // A block manager of the same executor already exists. + // This should never happen. Let's just quit. logError("Got two different block manager registrations on " + id.executorId) System.exit(1) case None => @@ -197,7 +225,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveActor) } - sender ! true } private def updateBlockInfo( @@ -226,12 +253,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) - var locations: HashSet[BlockManagerId] = null + var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { - locations = blockLocations.get(blockId)._2 + locations = blockLocations.get(blockId) } else { - locations = new HashSet[BlockManagerId] - blockLocations.put(blockId, (storageLevel.replication, locations)) + locations = new mutable.HashSet[BlockManagerId] + blockLocations.put(blockId, locations) } if (storageLevel.isValid) { @@ -247,70 +274,24 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! true } - private def getLocations(blockId: String) { - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockId + " " - if (blockLocations.containsKey(blockId)) { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockLocations.get(blockId)._2) - sender ! res.toSeq - } else { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - sender ! res - } - } - - private def getLocationsMultipleBlockIds(blockIds: Array[String]) { - def getLocations(blockId: String): Seq[BlockManagerId] = { - val tmp = blockId - if (blockLocations.containsKey(blockId)) { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockLocations.get(blockId)._2) - return res.toSeq - } else { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - return res.toSeq - } - } - - var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]] - for (blockId <- blockIds) { - res.append(getLocations(blockId)) - } - sender ! res.toSeq + private def getLocations(blockId: String): Seq[BlockManagerId] = { + if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty } - private def getPeers(blockManagerId: BlockManagerId, size: Int) { - var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(peers) - res -= blockManagerId - val rand = new Random(System.currentTimeMillis()) - while (res.length > size) { - res.remove(rand.nextInt(res.length)) - } - sender ! res.toSeq + private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = { + blockIds.map(blockId => getLocations(blockId)) } - private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) { - var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { + val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray val selfIndex = peers.indexOf(blockManagerId) if (selfIndex == -1) { - throw new Exception("Self index for " + blockManagerId + " not found") + throw new SparkException("Self index for " + blockManagerId + " not found") } // Note that this logic will select the same node multiple times if there aren't enough peers - var index = selfIndex - while (res.size < size) { - index += 1 - if (index == selfIndex) { - throw new Exception("More peer expected than available") - } - res += peers(index % peers.size) - } - sender ! res.toSeq + Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq } } @@ -384,6 +365,13 @@ object BlockManagerMasterActor { } } + def removeBlock(blockId: String) { + if (_blocks.containsKey(blockId)) { + _remainingMem += _blocks.get(blockId).memSize + _blocks.remove(blockId) + } + } + def remainingMem: Long = _remainingMem def lastSeenMs: Long = _lastSeenMs diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index cff48d9909..88268fd41b 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -16,6 +16,12 @@ sealed trait ToBlockManagerSlave private[spark] case class RemoveBlock(blockId: String) extends ToBlockManagerSlave +// Remove all blocks belonging to a specific RDD. +private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave + +// Remove the meta data for a RDD. This is only sent to the master by the master. +private[spark] case class RemoveRddMetaData(rddId: Int, numBlocks: Int) extends ToBlockManagerMaster + ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala index f570cdc52d..b264d1deb5 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala @@ -11,6 +11,12 @@ import spark.{Logging, SparkException, Utils} */ class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor { override def receive = { - case RemoveBlock(blockId) => blockManager.removeBlock(blockId) + + case RemoveBlock(blockId) => + blockManager.removeBlock(blockId) + + case RemoveRdd(rddId) => + val numBlocksRemoved = blockManager.removeRdd(rddId) + sender ! numBlocksRemoved } } diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index 15225f93a6..3057ade233 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -2,13 +2,7 @@ package spark.storage import java.nio.ByteBuffer -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.util.Random - -import spark.{Logging, Utils, SparkEnv} +import spark.{Logging, Utils} import spark.network._ /** @@ -88,8 +82,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null - private val DATA_TRANSFER_TIME_OUT_MS: Long = 500 - private val REQUEST_RETRY_INTERVAL_MS: Long = 1000 initLogging() diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index bff2475686..b9d5f9668e 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -15,10 +15,10 @@ import org.scalatest.time.SpanSugar._ import spark.JavaSerializer import spark.KryoSerializer import spark.SizeEstimator -import spark.Utils import spark.util.AkkaUtils import spark.util.ByteBufferInputStream + class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var store: BlockManager = null var store2: BlockManager = null @@ -124,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory assert(store.getSingle("a1") != None, "a1 was not in store") @@ -170,7 +170,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false) + store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory and memory size val memStatus = master.getMemoryStatus.head._2 @@ -218,7 +218,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) - master.removeRdd(0) + master.removeRdd(0, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { store.getSingle("rdd_0_0") should be (None) @@ -232,6 +232,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.getSingle("nonrddblock") should not be (None) master.getLocations("nonrddblock") should have size (1) } + + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + master.removeRdd(0, blocking = true) + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 } test("reregistration on heart beat") { @@ -262,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister() assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") @@ -280,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master.removeExecutor(store.blockManagerId.executorId) val t1 = new Thread { override def run() { - store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } } val t2 = new Thread { @@ -490,9 +498,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true) - store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -501,7 +509,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -516,9 +524,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list4 = List(new Array[Byte](200), new Array[Byte](200)) // First store list1 and list2, both in memory, and list3, on disk only - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true) - store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -533,7 +541,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true) + store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) |