aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-31 15:54:57 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-31 15:54:57 -0700
commitde1167bf2c32d52c865a4a0c7213b665ebd61f93 (patch)
tree014841b36dc99804dcd7cdd13325e386bff2e81a
parentba5e544461e8ca9216af703033f6b0de6dbc56ec (diff)
downloadspark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.tar.gz
spark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.tar.bz2
spark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.zip
Incorporated Charles' feedback to put rdd metadata removal in
BlockManagerMasterActor.
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala21
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala3
3 files changed, 13 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 7099e40618..58888b1ebb 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -84,24 +84,9 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
* Remove all blocks belonging to the given RDD.
*/
def removeRdd(rddId: Int, blocking: Boolean) {
- // The logic to remove an RDD is somewhat complicated:
- // 1. Send BlockManagerMasterActor a RemoveRdd message.
- // 2. Upon receiving the RemoveRdd message, BlockManagerMasterActor will forward the message
- // to all workers to remove blocks belonging to the RDD, and return a Future for the results.
- // 3. The Future is sent back here, and on successful completion of the Future, this function
- // sends a RemoveRddMetaData message to BlockManagerMasterActor.
- // 4. Upon receiving the RemoveRddMetaData message, BlockManagerMasterActor will delete the meta
- // data for the given RDD.
- //
- // The reason we are doing it this way is to reduce the amount of messages the driver sends.
- // The number of messages that need to be sent is only the number of workers the cluster has,
- // rather than the number of blocks in the cluster. Note that we can further reduce the number
- // of messages by tracking for a given RDD, where are its blocks. Then we can send only to the
- // workers that have the given RDD. But this remains future work.
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
- future onComplete {
- case Left(throwable) => logError("Failed to remove RDD " + rddId, throwable)
- case Right(numBlocks) => tell(RemoveRddMetaData(rddId, numBlocks.sum))
+ future onFailure {
+ case e: Throwable => logError("Failed to remove RDD " + rddId, e)
}
if (blocking) {
Await.result(future, timeout)
@@ -156,7 +141,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
- throw new Exception("BlockManagerMaster returned null")
+ throw new SparkException("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 00aa97bf78..2d05e0ccf1 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -79,10 +79,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
case RemoveRdd(rddId) =>
sender ! removeRdd(rddId)
- case RemoveRddMetaData(rddId, numBlocks) =>
- removeRddMetaData(rddId, numBlocks)
- sender ! true
-
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
sender ! true
@@ -110,15 +106,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
- // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
- // The dispatcher is used as an implicit argument into the Future sequence construction.
- import context.dispatcher
- Future.sequence(blockManagerInfo.values.map { bm =>
- bm.slaveActor.ask(RemoveRdd(rddId))(akkaTimeout).mapTo[Int]
- }.toSeq)
- }
+ // First remove the metadata for the given RDD, and then asynchronously remove the blocks
+ // from the slaves.
- private def removeRddMetaData(rddId: Int, numBlocks: Int) {
val prefix = "rdd_" + rddId + "_"
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks.
@@ -128,6 +118,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
+
+ // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
+ // The dispatcher is used as an implicit argument into the Future sequence construction.
+ import context.dispatcher
+ val removeMsg = RemoveRdd(rddId)
+ Future.sequence(blockManagerInfo.values.map { bm =>
+ bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
+ }.toSeq)
}
private def removeBlockManager(blockManagerId: BlockManagerId) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 88268fd41b..0010726c8d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -19,9 +19,6 @@ case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
// Remove all blocks belonging to a specific RDD.
private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-// Remove the meta data for a RDD. This is only sent to the master by the master.
-private[spark] case class RemoveRddMetaData(rddId: Int, numBlocks: Int) extends ToBlockManagerMaster
-
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.