aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjeanlyn <jeanlyn92@gmail.com>2016-03-28 16:56:25 -0700
committerAndrew Or <andrew@databricks.com>2016-03-28 16:56:25 -0700
commitad9e3d50f71b096872806a86d89c03a208b1cf8b (patch)
treed292c8bd36e985878a6f7dd99687e3507d77e5e2 /core
parenta916d2a454b63a4c234b1e0b5bf9c5b212bd37fa (diff)
downloadspark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.tar.gz
spark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.tar.bz2
spark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.zip
[SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding driver OOM
## What changes were proposed in this pull request? We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM ``` num #instances #bytes class name ---------------------------------------------- 1: 13845916 553836640 org.apache.spark.storage.BlockStatus 2: 14020324 336487776 org.apache.spark.storage.StreamBlockId 3: 13883881 333213144 scala.collection.mutable.DefaultEntry 4: 8907 89043952 [Lscala.collection.mutable.HashEntry; 5: 62360 65107352 [B 6: 163368 24453904 [Ljava.lang.Object; 7: 293651 20342664 [C ... ``` `BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end. After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`. In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time. ## How was this patch tested? Existing unit tests and manual tests Author: jeanlyn <jeanlyn92@gmail.com> Closes #11779 from jeanlyn/fix_driver_oom.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala21
-rw-r--r--core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json4
-rw-r--r--core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json10
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala67
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala58
7 files changed, 91 insertions, 95 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index f552b498a7..3008520f61 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val info = taskEnd.taskInfo
- val metrics = taskEnd.taskMetrics
- if (info != null && metrics != null) {
- val updatedBlocks = metrics.updatedBlockStatuses
- if (updatedBlocks.length > 0) {
- updateStorageStatus(info.executorId, updatedBlocks)
- }
- }
- }
-
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateStorageStatus(executorId, Seq((blockId, blockStatus)))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 8f75b586e1..50095831b4 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}
- /**
- * Assumes the storage status list is fully up-to-date. This implies the corresponding
- * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
- */
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val metrics = taskEnd.taskMetrics
- if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
- updateRDDInfo(metrics.updatedBlockStatuses)
- }
- }
-
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
@@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ super.onBlockUpdated(blockUpdated)
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateRDDInfo(Seq((blockId, blockStatus)))
+ }
}
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index 4a88eeee74..efc865919b 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -2,8 +2,8 @@
"id" : "<driver>",
"hostPort" : "localhost:57971",
"isActive" : true,
- "rddBlocks" : 8,
- "memoryUsed" : 28000128,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
"diskUsed" : 0,
"totalCores" : 0,
"maxTasks" : 0,
diff --git a/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
index f79a31022d..8878e547a7 100644
--- a/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
@@ -1,9 +1 @@
-[ {
- "id" : 0,
- "name" : "0",
- "numPartitions" : 8,
- "numCachedPartitions" : 8,
- "storageLevel" : "Memory Deserialized 1x Replicated",
- "memoryUsed" : 28000128,
- "diskUsed" : 0
-} ] \ No newline at end of file
+[ ] \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 5822261d8d..79e4efb1a8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -140,8 +140,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"stage task list from multi-attempt app json(2)" ->
"applications/local-1426533911241/2/stages/0/0/taskList",
- "rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
- "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
+ "rdd list storage json" -> "applications/local-1422981780767/storage/rdd"
+ // Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
+ // "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
)
// run a bunch of characterization tests -- just verify the behavior is the same as what is saved
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 14daa003bc..9835f11a2f 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -82,48 +82,51 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
- test("task end with updated blocks") {
+ test("updated blocks") {
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
- val taskMetrics1 = new TaskMetrics
- val taskMetrics2 = new TaskMetrics
- val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
- val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
- val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
- taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
- taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
-
- // Task end with new blocks
+
+ val blockUpdateInfos1 = Seq(
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
+ )
+ val blockUpdateInfos2 =
+ Seq(BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
+
+ // Add some new blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+ postUpdateBlock(listener, blockUpdateInfos1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
+ postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
- // Task end with dropped blocks
- val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
- val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
- val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
- taskMetrics1.setUpdatedBlockStatuses(Seq(droppedBlock1, droppedBlock3))
- taskMetrics2.setUpdatedBlockStatuses(Seq(droppedBlock2, droppedBlock3))
+ // Dropped the blocks
+ val droppedBlockInfo1 = Seq(
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.NONE, 0L, 0L),
+ BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
+ )
+ val droppedBlockInfo2 = Seq(
+ BlockUpdatedInfo(bm2, RDDBlockId(1, 2), StorageLevel.NONE, 0L, 0L),
+ BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
+ )
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+ postUpdateBlock(listener, droppedBlockInfo1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
+ postUpdateBlock(listener, droppedBlockInfo2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -134,15 +137,14 @@ class StorageStatusListenerSuite extends SparkFunSuite {
test("unpersist RDD") {
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- val taskMetrics1 = new TaskMetrics
- val taskMetrics2 = new TaskMetrics
- val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
- val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
- val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
- taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
- taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
+ val blockUpdateInfos1 = Seq(
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
+ )
+ val blockUpdateInfos2 =
+ Seq(BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
+ postUpdateBlock(listener, blockUpdateInfos1)
+ postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
// Unpersist RDD
@@ -155,4 +157,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}
+
+ private def postUpdateBlock(
+ listener: StorageStatusListener, updateBlockInfos: Seq[BlockUpdatedInfo]): Unit = {
+ updateBlockInfos.foreach { updateBlockInfo =>
+ listener.onBlockUpdated(SparkListenerBlockUpdated(updateBlockInfo))
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 6b7c538ac8..7d77deeb60 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -106,7 +106,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 0)
}
- test("task end") {
+ test("block update") {
val myRddInfo0 = rddInfo0
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
@@ -120,19 +120,13 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)
- // Task end with no updated blocks. This should not change anything.
- bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
- assert(storageListener._rddInfoMap.size === 3)
- assert(storageListener.rddInfoList.size === 0)
-
- // Task end with a few new persisted blocks, some from the same RDD
- val metrics1 = new TaskMetrics
- metrics1.setUpdatedBlockStatuses(Seq(
- (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
- (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
- (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
- ))
- bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
+ // Some blocks updated
+ val blockUpdateInfos = Seq(
+ BlockUpdatedInfo(bm1, RDDBlockId(0, 100), memAndDisk, 400L, 0L),
+ BlockUpdatedInfo(bm1, RDDBlockId(0, 101), memAndDisk, 0L, 400L),
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 20), memAndDisk, 0L, 240L)
+ )
+ postUpdateBlocks(bus, blockUpdateInfos)
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
@@ -144,15 +138,14 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(2).isCached)
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
- // Task end with a few dropped blocks
- val metrics2 = new TaskMetrics
- metrics2.setUpdatedBlockStatuses(Seq(
- (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
- (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
- (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
- (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist
- ))
- bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
+ // Drop some blocks
+ val blockUpdateInfos2 = Seq(
+ BlockUpdatedInfo(bm1, RDDBlockId(0, 100), none, 0L, 0L),
+ BlockUpdatedInfo(bm1, RDDBlockId(1, 20), none, 0L, 0L),
+ BlockUpdatedInfo(bm1, RDDBlockId(2, 40), none, 0L, 0L), // doesn't actually exist
+ BlockUpdatedInfo(bm1, RDDBlockId(4, 80), none, 0L, 0L) // doesn't actually exist
+ )
+ postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener._rddInfoMap(0).memSize === 0L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
@@ -169,24 +162,27 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
- val taskMetrics0 = new TaskMetrics
- val taskMetrics1 = new TaskMetrics
- val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
- val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
- taskMetrics0.setUpdatedBlockStatuses(Seq(block0))
- taskMetrics1.setUpdatedBlockStatuses(Seq(block1))
+ val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
+ val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
- bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
+ postUpdateBlocks(bus, blockUpdateInfos1)
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageCompleted(stageInfo0))
assert(storageListener.rddInfoList.size === 1)
- bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, taskMetrics1))
+ postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener.rddInfoList.size === 2)
bus.postToAll(SparkListenerStageCompleted(stageInfo1))
assert(storageListener.rddInfoList.size === 2)
}
+
+ private def postUpdateBlocks(
+ bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
+ blockUpdateInfos.foreach { blockUpdateInfo =>
+ bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo))
+ }
+ }
}