aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjeanlyn <jeanlyn92@gmail.com>2016-03-31 12:04:42 -0700
committerAndrew Or <andrew@databricks.com>2016-03-31 12:04:42 -0700
commit8a333d2da859fd593bda183413630bc3757529c9 (patch)
tree646fac41a0dd46838b89966dbff3cc9ccbbf4b9f
parent446c45bd87035e20653394fcaf9dc8caa4299038 (diff)
downloadspark-8a333d2da859fd593bda183413630bc3757529c9.tar.gz
spark-8a333d2da859fd593bda183413630bc3757529c9.tar.bz2
spark-8a333d2da859fd593bda183413630bc3757529c9.zip
[SPARK-14243][CORE] update task metrics when removing blocks
## What changes were proposed in this pull request? This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses` ## How was this patch tested? test("updated block statuses") in BlockManagerSuite.scala Author: jeanlyn <jeanlyn92@gmail.com> Closes #12091 from jeanlyn/updateBlock.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala10
2 files changed, 15 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0c7763f236..3014cafc28 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1264,9 +1264,12 @@ private[spark] class BlockManager(
"the disk, memory, or external block store")
}
blockInfoManager.removeBlock(blockId)
+ val removeBlockStatus = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
- val status = getCurrentBlockStatus(blockId, info)
- reportBlockStatus(blockId, info, status)
+ reportBlockStatus(blockId, info, removeBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus)))
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6fc32cb30a..9f3a775654 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
+
+ // remove block - list2 should be removed from disk
+ val updatedBlocks6 = getUpdatedBlocks {
+ store.removeBlock(
+ "list2", tellMaster = true)
+ }
+ assert(updatedBlocks6.size === 1)
+ assert(updatedBlocks6.head._1 === TestBlockId("list2"))
+ assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
+ assert(!store.diskStore.contains("list2"), "list2 was in disk store")
}
test("query block statuses") {