aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/JsonProtocol.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
1 files changed, 9 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c9beeb25e0..7f5d713ec6 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,19 +19,21 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
import scala.collection.JavaConverters._
import scala.collection.Map
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
/**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats
+ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
+ case _ => parse(mapper.writeValueAsString(event))
}
}
@@ -511,6 +516,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+ case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+ .asInstanceOf[SparkListenerEvent]
}
}