aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/JsonProtocol.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala31
1 files changed, 29 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index adf69a4e78..a078f14af5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -92,8 +92,8 @@ private[spark] object JsonProtocol {
executorRemovedToJson(executorRemoved)
case logStart: SparkListenerLogStart =>
logStartToJson(logStart)
- // These aren't used, but keeps compiler happy
- case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
+ case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+ executorMetricsUpdateToJson(metricsUpdate)
}
}
@@ -224,6 +224,19 @@ private[spark] object JsonProtocol {
("Spark Version" -> SPARK_VERSION)
}
+ def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
+ val execId = metricsUpdate.execId
+ val taskMetrics = metricsUpdate.taskMetrics
+ ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+ ("Executor ID" -> execId) ~
+ ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
+ ("Task ID" -> taskId) ~
+ ("Stage ID" -> stageId) ~
+ ("Stage Attempt ID" -> stageAttemptId) ~
+ ("Task Metrics" -> taskMetricsToJson(metrics))
+ })
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */
@@ -463,6 +476,7 @@ private[spark] object JsonProtocol {
val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
+ val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -481,6 +495,7 @@ private[spark] object JsonProtocol {
case `executorAdded` => executorAddedFromJson(json)
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
+ case `metricsUpdate` => executorMetricsUpdateFromJson(json)
}
}
@@ -598,6 +613,18 @@ private[spark] object JsonProtocol {
SparkListenerLogStart(sparkVersion)
}
+ def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
+ val execInfo = (json \ "Executor ID").extract[String]
+ val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
+ val taskId = (json \ "Task ID").extract[Long]
+ val stageId = (json \ "Stage ID").extract[Int]
+ val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+ val metrics = taskMetricsFromJson(json \ "Task Metrics")
+ (taskId, stageId, stageAttemptId, metrics)
+ }
+ SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
* ---------------------------------------------------------------------- */