diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-31 15:54:57 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-31 15:54:57 -0700 |
commit | de1167bf2c32d52c865a4a0c7213b665ebd61f93 (patch) | |
tree | 014841b36dc99804dcd7cdd13325e386bff2e81a | |
parent | ba5e544461e8ca9216af703033f6b0de6dbc56ec (diff) | |
download | spark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.tar.gz spark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.tar.bz2 spark-de1167bf2c32d52c865a4a0c7213b665ebd61f93.zip |
Incorporated Charles' feedback to put rdd metadata removal in
BlockManagerMasterActor.
3 files changed, 13 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 7099e40618..58888b1ebb 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -84,24 +84,9 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi * Remove all blocks belonging to the given RDD. */ def removeRdd(rddId: Int, blocking: Boolean) { - // The logic to remove an RDD is somewhat complicated: - // 1. Send BlockManagerMasterActor a RemoveRdd message. - // 2. Upon receiving the RemoveRdd message, BlockManagerMasterActor will forward the message - // to all workers to remove blocks belonging to the RDD, and return a Future for the results. - // 3. The Future is sent back here, and on successful completion of the Future, this function - // sends a RemoveRddMetaData message to BlockManagerMasterActor. - // 4. Upon receiving the RemoveRddMetaData message, BlockManagerMasterActor will delete the meta - // data for the given RDD. - // - // The reason we are doing it this way is to reduce the amount of messages the driver sends. - // The number of messages that need to be sent is only the number of workers the cluster has, - // rather than the number of blocks in the cluster. Note that we can further reduce the number - // of messages by tracking for a given RDD, where are its blocks. Then we can send only to the - // workers that have the given RDD. But this remains future work. val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) - future onComplete { - case Left(throwable) => logError("Failed to remove RDD " + rddId, throwable) - case Right(numBlocks) => tell(RemoveRddMetaData(rddId, numBlocks.sum)) + future onFailure { + case e: Throwable => logError("Failed to remove RDD " + rddId, e) } if (blocking) { Await.result(future, timeout) @@ -156,7 +141,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { - throw new Exception("BlockManagerMaster returned null") + throw new SparkException("BlockManagerMaster returned null") } return result.asInstanceOf[T] } catch { diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 00aa97bf78..2d05e0ccf1 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -79,10 +79,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { case RemoveRdd(rddId) => sender ! removeRdd(rddId) - case RemoveRddMetaData(rddId, numBlocks) => - removeRddMetaData(rddId, numBlocks) - sender ! true - case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) sender ! true @@ -110,15 +106,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } private def removeRdd(rddId: Int): Future[Seq[Int]] = { - // Ask the slaves to remove the RDD, and put the result in a sequence of Futures. - // The dispatcher is used as an implicit argument into the Future sequence construction. - import context.dispatcher - Future.sequence(blockManagerInfo.values.map { bm => - bm.slaveActor.ask(RemoveRdd(rddId))(akkaTimeout).mapTo[Int] - }.toSeq) - } + // First remove the metadata for the given RDD, and then asynchronously remove the blocks + // from the slaves. - private def removeRddMetaData(rddId: Int, numBlocks: Int) { val prefix = "rdd_" + rddId + "_" // Find all blocks for the given RDD, remove the block from both blockLocations and // the blockManagerInfo that is tracking the blocks. @@ -128,6 +118,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId))) blockLocations.remove(blockId) } + + // Ask the slaves to remove the RDD, and put the result in a sequence of Futures. + // The dispatcher is used as an implicit argument into the Future sequence construction. + import context.dispatcher + val removeMsg = RemoveRdd(rddId) + Future.sequence(blockManagerInfo.values.map { bm => + bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] + }.toSeq) } private def removeBlockManager(blockManagerId: BlockManagerId) { diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 88268fd41b..0010726c8d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -19,9 +19,6 @@ case class RemoveBlock(blockId: String) extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave -// Remove the meta data for a RDD. This is only sent to the master by the master. -private[spark] case class RemoveRddMetaData(rddId: Int, numBlocks: Int) extends ToBlockManagerMaster - ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. |