aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorLukasz <lgieron@gmail.com>2016-05-25 10:24:21 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-25 10:24:21 -0700
commitb120fba6ae26186b3fa0dfbb1637046f4e76c2b0 (patch)
tree4f0f9f23a76c2a1c6d2b25ebdc6a3c13d5e7b6d8 /core/src
parent4f27b8dd58a66fca7ddd4c239e02b90c34b1cebd (diff)
downloadspark-b120fba6ae26186b3fa0dfbb1637046f4e76c2b0.tar.gz
spark-b120fba6ae26186b3fa0dfbb1637046f4e76c2b0.tar.bz2
spark-b120fba6ae26186b3fa0dfbb1637046f4e76c2b0.zip
[SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change.
## What changes were proposed in this pull request? 1. Making 'name' field of RDDInfo mutable. 2. In StorageListener: catching the fact that RDD's name was changed and updating it in RDDInfo. ## How was this patch tested? 1. Manual verification - the 'Storage' tab now behaves as expected. 2. The commit also contains a new unit test which verifies this. Author: Lukasz <lgieron@gmail.com> Closes #13264 from lgieron/SPARK-9044.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala17
3 files changed, 19 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 083d78b59e..e5abbf745c 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -24,7 +24,7 @@ import org.apache.spark.util.Utils
@DeveloperApi
class RDDInfo(
val id: Int,
- val name: String,
+ var name: String,
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 50095831b4..c212362557 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -59,7 +59,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
- rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
+ rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info).name = info.name }
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 7d77deeb60..411a0ddebe 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -179,6 +179,23 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 2)
}
+ test("verify StorageTab still contains a renamed RDD") {
+ val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4))
+ val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details")
+ bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
+ postUpdateBlocks(bus, blockUpdateInfos1)
+ assert(storageListener.rddInfoList.size == 1)
+
+ val newName = "new_name"
+ val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4))
+ val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+ assert(storageListener.rddInfoList.size == 1)
+ assert(storageListener.rddInfoList.head.name == newName)
+ }
+
private def postUpdateBlocks(
bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
blockUpdateInfos.foreach { blockUpdateInfo =>