From f67da43c394c27ceb4e6bfd49e81be05e406aa29 Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 21 Jul 2015 09:51:13 -0700 Subject: [SPARK-9036] [CORE] SparkListenerExecutorMetricsUpdate messages not included in JsonProtocol This PR implements a JSON serializer and deserializer in the JSONProtocol to handle the (de)serialization of SparkListenerExecutorMetricsUpdate events. It also includes a unit test in the JSONProtocolSuite file. This was implemented to satisfy the improvement request in the JIRA issue SPARK-9036. Author: Ben Closes #7555 from NamelessAnalyst/master and squashes the following commits: fb4e3cc [Ben] Update JSON Protocol and tests aa69517 [Ben] Update JSON Protocol and tests --Corrected Stage Attempt to Stage Attempt ID 33e5774 [Ben] Update JSON Protocol Tests 3f237e7 [Ben] Update JSON Protocol Tests 84ca798 [Ben] Update JSON Protocol Tests cde57a0 [Ben] Update JSON Protocol Tests 8049600 [Ben] Update JSON Protocol Tests c5bc061 [Ben] Update JSON Protocol Tests 6f25785 [Ben] Merge remote-tracking branch 'origin/master' df2a609 [Ben] Update JSON Protocol dcda80b [Ben] Update JSON Protocol --- .../scala/org/apache/spark/util/JsonProtocol.scala | 31 +++++++++- .../org/apache/spark/util/JsonProtocolSuite.scala | 69 +++++++++++++++++++++- 2 files changed, 96 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index adf69a4e78..a078f14af5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -92,8 +92,8 @@ private[spark] object JsonProtocol { executorRemovedToJson(executorRemoved) case logStart: SparkListenerLogStart => logStartToJson(logStart) - // These aren't used, but keeps compiler happy - case SparkListenerExecutorMetricsUpdate(_, _) => JNothing + case metricsUpdate: SparkListenerExecutorMetricsUpdate => + executorMetricsUpdateToJson(metricsUpdate) } } @@ -224,6 +224,19 @@ private[spark] object JsonProtocol { ("Spark Version" -> SPARK_VERSION) } + def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { + val execId = metricsUpdate.execId + val taskMetrics = metricsUpdate.taskMetrics + ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~ + ("Executor ID" -> execId) ~ + ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) => + ("Task ID" -> taskId) ~ + ("Stage ID" -> stageId) ~ + ("Stage Attempt ID" -> stageAttemptId) ~ + ("Task Metrics" -> taskMetricsToJson(metrics)) + }) + } + /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ @@ -463,6 +476,7 @@ private[spark] object JsonProtocol { val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) val logStart = Utils.getFormattedClassName(SparkListenerLogStart) + val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -481,6 +495,7 @@ private[spark] object JsonProtocol { case `executorAdded` => executorAddedFromJson(json) case `executorRemoved` => executorRemovedFromJson(json) case `logStart` => logStartFromJson(json) + case `metricsUpdate` => executorMetricsUpdateFromJson(json) } } @@ -598,6 +613,18 @@ private[spark] object JsonProtocol { SparkListenerLogStart(sparkVersion) } + def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = { + val execInfo = (json \ "Executor ID").extract[String] + val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json => + val taskId = (json \ "Task ID").extract[Long] + val stageId = (json \ "Stage ID").extract[Int] + val stageAttemptId = (json \ "Stage Attempt ID").extract[Int] + val metrics = taskMetricsFromJson(json \ "Task Metrics") + (taskId, stageId, stageAttemptId, metrics) + } + SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics) + } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | * ---------------------------------------------------------------------- */ diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index e0ef9c70a5..dde95f3778 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -83,6 +83,9 @@ class JsonProtocolSuite extends SparkFunSuite { val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") + val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq( + (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, + hasHadoopInput = true, hasOutput = true)))) testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -102,6 +105,7 @@ class JsonProtocolSuite extends SparkFunSuite { testEvent(applicationEnd, applicationEndJsonString) testEvent(executorAdded, executorAddedJsonString) testEvent(executorRemoved, executorRemovedJsonString) + testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString) } test("Dependent Classes") { @@ -440,10 +444,20 @@ class JsonProtocolSuite extends SparkFunSuite { case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) => assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) => - assert(e1.executorId == e1.executorId) + assert(e1.executorId === e1.executorId) assertEquals(e1.executorInfo, e2.executorInfo) case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) => - assert(e1.executorId == e1.executorId) + assert(e1.executorId === e1.executorId) + case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) => + assert(e1.execId === e2.execId) + assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => { + val (taskId1, stageId1, stageAttemptId1, metrics1) = a + val (taskId2, stageId2, stageAttemptId2, metrics2) = b + assert(taskId1 === taskId2) + assert(stageId1 === stageId2) + assert(stageAttemptId1 === stageAttemptId2) + assertEquals(metrics1, metrics2) + }) case (e1, e2) => assert(e1 === e2) case _ => fail("Events don't match in types!") @@ -1598,4 +1612,55 @@ class JsonProtocolSuite extends SparkFunSuite { | "Removed Reason": "test reason" |} """ + + private val executorMetricsUpdateJsonString = + s""" + |{ + | "Event": "SparkListenerExecutorMetricsUpdate", + | "Executor ID": "exec3", + | "Metrics Updated": [ + | { + | "Task ID": 1, + | "Stage ID": 2, + | "Stage Attempt ID": 3, + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Input Metrics": { + | "Data Read Method": "Hadoop", + | "Bytes Read": 2100, + | "Records Read": 21 + | }, + | "Output Metrics": { + | "Data Write Method": "Hadoop", + | "Bytes Written": 1200, + | "Records Written": 12 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use ExternalBlockStore": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "ExternalBlockStore Size": 0, + | "Disk Size": 0 + | } + | } + | ] + | } + | }] + |} + """.stripMargin } -- cgit v1.2.3