aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBen <benjaminpiering@gmail.com>2015-07-21 09:51:13 -0700
committerAndrew Or <andrew@databricks.com>2015-07-21 09:51:13 -0700
commitf67da43c394c27ceb4e6bfd49e81be05e406aa29 (patch)
treeecb06436e24095abbd3dab8bba0a1556733b5433 /core
parent6592a6058eee6a27a5c91281ca19076284d62483 (diff)
downloadspark-f67da43c394c27ceb4e6bfd49e81be05e406aa29.tar.gz
spark-f67da43c394c27ceb4e6bfd49e81be05e406aa29.tar.bz2
spark-f67da43c394c27ceb4e6bfd49e81be05e406aa29.zip
[SPARK-9036] [CORE] SparkListenerExecutorMetricsUpdate messages not included in JsonProtocol
This PR implements a JSON serializer and deserializer in the JSONProtocol to handle the (de)serialization of SparkListenerExecutorMetricsUpdate events. It also includes a unit test in the JSONProtocolSuite file. This was implemented to satisfy the improvement request in the JIRA issue SPARK-9036. Author: Ben <benjaminpiering@gmail.com> Closes #7555 from NamelessAnalyst/master and squashes the following commits: fb4e3cc [Ben] Update JSON Protocol and tests aa69517 [Ben] Update JSON Protocol and tests --Corrected Stage Attempt to Stage Attempt ID 33e5774 [Ben] Update JSON Protocol Tests 3f237e7 [Ben] Update JSON Protocol Tests 84ca798 [Ben] Update JSON Protocol Tests cde57a0 [Ben] Update JSON Protocol Tests 8049600 [Ben] Update JSON Protocol Tests c5bc061 [Ben] Update JSON Protocol Tests 6f25785 [Ben] Merge remote-tracking branch 'origin/master' df2a609 [Ben] Update JSON Protocol dcda80b [Ben] Update JSON Protocol
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala69
2 files changed, 96 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index adf69a4e78..a078f14af5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -92,8 +92,8 @@ private[spark] object JsonProtocol {
executorRemovedToJson(executorRemoved)
case logStart: SparkListenerLogStart =>
logStartToJson(logStart)
- // These aren't used, but keeps compiler happy
- case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
+ case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+ executorMetricsUpdateToJson(metricsUpdate)
}
}
@@ -224,6 +224,19 @@ private[spark] object JsonProtocol {
("Spark Version" -> SPARK_VERSION)
}
+ def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
+ val execId = metricsUpdate.execId
+ val taskMetrics = metricsUpdate.taskMetrics
+ ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+ ("Executor ID" -> execId) ~
+ ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
+ ("Task ID" -> taskId) ~
+ ("Stage ID" -> stageId) ~
+ ("Stage Attempt ID" -> stageAttemptId) ~
+ ("Task Metrics" -> taskMetricsToJson(metrics))
+ })
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */
@@ -463,6 +476,7 @@ private[spark] object JsonProtocol {
val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
+ val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -481,6 +495,7 @@ private[spark] object JsonProtocol {
case `executorAdded` => executorAddedFromJson(json)
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
+ case `metricsUpdate` => executorMetricsUpdateFromJson(json)
}
}
@@ -598,6 +613,18 @@ private[spark] object JsonProtocol {
SparkListenerLogStart(sparkVersion)
}
+ def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
+ val execInfo = (json \ "Executor ID").extract[String]
+ val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
+ val taskId = (json \ "Task ID").extract[Long]
+ val stageId = (json \ "Stage ID").extract[Int]
+ val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+ val metrics = taskMetricsFromJson(json \ "Task Metrics")
+ (taskId, stageId, stageAttemptId, metrics)
+ }
+ SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
* ---------------------------------------------------------------------- */
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index e0ef9c70a5..dde95f3778 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -83,6 +83,9 @@ class JsonProtocolSuite extends SparkFunSuite {
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
+ val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
+ (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
+ hasHadoopInput = true, hasOutput = true))))
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -102,6 +105,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testEvent(applicationEnd, applicationEndJsonString)
testEvent(executorAdded, executorAddedJsonString)
testEvent(executorRemoved, executorRemovedJsonString)
+ testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
}
test("Dependent Classes") {
@@ -440,10 +444,20 @@ class JsonProtocolSuite extends SparkFunSuite {
case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) =>
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
- assert(e1.executorId == e1.executorId)
+ assert(e1.executorId === e1.executorId)
assertEquals(e1.executorInfo, e2.executorInfo)
case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
- assert(e1.executorId == e1.executorId)
+ assert(e1.executorId === e1.executorId)
+ case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) =>
+ assert(e1.execId === e2.execId)
+ assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => {
+ val (taskId1, stageId1, stageAttemptId1, metrics1) = a
+ val (taskId2, stageId2, stageAttemptId2, metrics2) = b
+ assert(taskId1 === taskId2)
+ assert(stageId1 === stageId2)
+ assert(stageAttemptId1 === stageAttemptId2)
+ assertEquals(metrics1, metrics2)
+ })
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
@@ -1598,4 +1612,55 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Removed Reason": "test reason"
|}
"""
+
+ private val executorMetricsUpdateJsonString =
+ s"""
+ |{
+ | "Event": "SparkListenerExecutorMetricsUpdate",
+ | "Executor ID": "exec3",
+ | "Metrics Updated": [
+ | {
+ | "Task ID": 1,
+ | "Stage ID": 2,
+ | "Stage Attempt ID": 3,
+ | "Task Metrics": {
+ | "Host Name": "localhost",
+ | "Executor Deserialize Time": 300,
+ | "Executor Run Time": 400,
+ | "Result Size": 500,
+ | "JVM GC Time": 600,
+ | "Result Serialization Time": 700,
+ | "Memory Bytes Spilled": 800,
+ | "Disk Bytes Spilled": 0,
+ | "Input Metrics": {
+ | "Data Read Method": "Hadoop",
+ | "Bytes Read": 2100,
+ | "Records Read": 21
+ | },
+ | "Output Metrics": {
+ | "Data Write Method": "Hadoop",
+ | "Bytes Written": 1200,
+ | "Records Written": 12
+ | },
+ | "Updated Blocks": [
+ | {
+ | "Block ID": "rdd_0_0",
+ | "Status": {
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
+ | "Use ExternalBlockStore": false,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "Memory Size": 0,
+ | "ExternalBlockStore Size": 0,
+ | "Disk Size": 0
+ | }
+ | }
+ | ]
+ | }
+ | }]
+ |}
+ """.stripMargin
}