aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorjeanlyn <jeanlyn92@gmail.com>2016-03-28 16:56:25 -0700
committerAndrew Or <andrew@databricks.com>2016-03-28 16:56:25 -0700
commitad9e3d50f71b096872806a86d89c03a208b1cf8b (patch)
treed292c8bd36e985878a6f7dd99687e3507d77e5e2 /core/src/main/scala/org/apache
parenta916d2a454b63a4c234b1e0b5bf9c5b212bd37fa (diff)
downloadspark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.tar.gz
spark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.tar.bz2
spark-ad9e3d50f71b096872806a86d89c03a208b1cf8b.zip
[SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding driver OOM
## What changes were proposed in this pull request? We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM ``` num #instances #bytes class name ---------------------------------------------- 1: 13845916 553836640 org.apache.spark.storage.BlockStatus 2: 14020324 336487776 org.apache.spark.storage.StreamBlockId 3: 13883881 333213144 scala.collection.mutable.DefaultEntry 4: 8907 89043952 [Lscala.collection.mutable.HashEntry; 5: 62360 65107352 [B 6: 163368 24453904 [Ljava.lang.Object; 7: 293651 20342664 [C ... ``` `BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end. After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`. In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time. ## How was this patch tested? Existing unit tests and manual tests Author: jeanlyn <jeanlyn92@gmail.com> Closes #11779 from jeanlyn/fix_driver_oom.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala21
2 files changed, 20 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index f552b498a7..3008520f61 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val info = taskEnd.taskInfo
- val metrics = taskEnd.taskMetrics
- if (info != null && metrics != null) {
- val updatedBlocks = metrics.updatedBlockStatuses
- if (updatedBlocks.length > 0) {
- updateStorageStatus(info.executorId, updatedBlocks)
- }
- }
- }
-
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateStorageStatus(executorId, Seq((blockId, blockStatus)))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 8f75b586e1..50095831b4 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}
- /**
- * Assumes the storage status list is fully up-to-date. This implies the corresponding
- * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
- */
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val metrics = taskEnd.taskMetrics
- if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
- updateRDDInfo(metrics.updatedBlockStatuses)
- }
- }
-
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
@@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ super.onBlockUpdated(blockUpdated)
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateRDDInfo(Seq((blockId, blockStatus)))
+ }
}