aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2017-03-31 09:42:49 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2017-03-31 09:42:49 -0700
commitc4c03eed67c05a78dc8944f6119ea708d6b955be (patch)
tree8f50bc293129fa85fc104e29dd260f045e9bf6f3 /core
parent254877c2f04414c70d92fa0a00c0ecee1d73aba7 (diff)
downloadspark-c4c03eed67c05a78dc8944f6119ea708d6b955be.tar.gz
spark-c4c03eed67c05a78dc8944f6119ea708d6b955be.tar.bz2
spark-c4c03eed67c05a78dc8944f6119ea708d6b955be.zip
[SPARK-20084][CORE] Remove internal.metrics.updatedBlockStatuses from history files.
## What changes were proposed in this pull request? Remove accumulator updates for internal.metrics.updatedBlockStatuses from SparkListenerTaskEnd entries in the history file. These can cause history files to grow to hundreds of GB because the value of the accumulator contains all tracked blocks. ## How was this patch tested? Current History UI tests cover use of the history file. Author: Ryan Blue <blue@apache.org> Closes #17412 from rdblue/SPARK-20084-remove-block-accumulator-info.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala15
1 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2cb88919c8..1d2cb7acef 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -264,8 +264,7 @@ private[spark] object JsonProtocol {
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
- ("Accumulables" -> JArray(
- stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
+ ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
}
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -281,7 +280,15 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
- ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
+ ("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
+ }
+
+ private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")
+
+ def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
+ JArray(accumulables
+ .filterNot(_.name.exists(accumulableBlacklist.contains))
+ .toList.map(accumulableInfoToJson))
}
def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
@@ -376,7 +383,7 @@ private[spark] object JsonProtocol {
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
- val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
+ val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~