aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala84
1 files changed, 80 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 4bc1b407ad..7897fade2d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -81,6 +81,14 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
+ /**
+ * Check if block manager master has a block. Note that this can be used to check for only
+ * those blocks that are reported to block manager master.
+ */
+ def contains(blockId: BlockId) = {
+ !getLocations(blockId).isEmpty
+ }
+
/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
@@ -99,12 +107,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
askDriverWithReply(RemoveBlock(blockId))
}
- /**
- * Remove all blocks belonging to the given RDD.
- */
+ /** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
- future onFailure {
+ future.onFailure {
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
}
if (blocking) {
@@ -112,6 +118,31 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}
}
+ /** Remove all blocks belonging to the given shuffle. */
+ def removeShuffle(shuffleId: Int, blocking: Boolean) {
+ val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
+ future.onFailure {
+ case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
+ }
+ if (blocking) {
+ Await.result(future, timeout)
+ }
+ }
+
+ /** Remove all blocks belonging to the given broadcast. */
+ def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
+ val future = askDriverWithReply[Future[Seq[Int]]](
+ RemoveBroadcast(broadcastId, removeFromMaster))
+ future.onFailure {
+ case e: Throwable =>
+ logError("Failed to remove broadcast " + broadcastId +
+ " with removeFromMaster = " + removeFromMaster, e)
+ }
+ if (blocking) {
+ Await.result(future, timeout)
+ }
+ }
+
/**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum
@@ -126,6 +157,51 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
}
+ /**
+ * Return the block's status on all block managers, if any. NOTE: This is a
+ * potentially expensive operation and should only be used for testing.
+ *
+ * If askSlaves is true, this invokes the master to query each block manager for the most
+ * updated block statuses. This is useful when the master is not informed of the given block
+ * by all block managers.
+ */
+ def getBlockStatus(
+ blockId: BlockId,
+ askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
+ val msg = GetBlockStatus(blockId, askSlaves)
+ /*
+ * To avoid potential deadlocks, the use of Futures is necessary, because the master actor
+ * should not block on waiting for a block manager, which can in turn be waiting for the
+ * master actor for a response to a prior message.
+ */
+ val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
+ val (blockManagerIds, futures) = response.unzip
+ val result = Await.result(Future.sequence(futures), timeout)
+ if (result == null) {
+ throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
+ }
+ val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
+ blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
+ status.map { s => (blockManagerId, s) }
+ }.toMap
+ }
+
+ /**
+ * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
+ * is a potentially expensive operation and should only be used for testing.
+ *
+ * If askSlaves is true, this invokes the master to query each block manager for the most
+ * updated block statuses. This is useful when the master is not informed of the given block
+ * by all block managers.
+ */
+ def getMatchingBlockIds(
+ filter: BlockId => Boolean,
+ askSlaves: Boolean): Seq[BlockId] = {
+ val msg = GetMatchingBlockIds(filter, askSlaves)
+ val future = askDriverWithReply[Future[Seq[BlockId]]](msg)
+ Await.result(future, timeout)
+ }
+
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {