aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-11-17 18:45:15 -0800
committerReynold Xin <rxin@databricks.com>2016-11-17 18:45:15 -0800
commitd9dd979d170f44383a9a87f892f2486ddb3cca7d (patch)
treed2ec15d29b17fec2835674ef5a8f7a5e40aafb13
parentce13c2672318242748f7520ed4ce6bcfad4fb428 (diff)
downloadspark-d9dd979d170f44383a9a87f892f2486ddb3cca7d.tar.gz
spark-d9dd979d170f44383a9a87f892f2486ddb3cca7d.tar.bz2
spark-d9dd979d170f44383a9a87f892f2486ddb3cca7d.zip
[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event
## What changes were proposed in this pull request? This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long` error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates` event. The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges for more details). This was causing a problem where our code expected a field to be deserialized as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead. This patch hacks around this issue by registering a custom `Converter` with Jackson in order to deserialize the tuples as `(Object, Object)` and perform the appropriate casting. ## How was this patch tested? New regression tests in `SQLListenerSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #15922 from JoshRosen/SPARK-18462.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala44
2 files changed, 80 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 60f13432d7..5daf21595d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
+import com.fasterxml.jackson.databind.JavaType
+import com.fasterxml.jackson.databind.`type`.TypeFactory
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.databind.util.Converter
+
import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
@@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
+case class SparkListenerDriverAccumUpdates(
+ executionId: Long,
+ @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
+ accumUpdates: Seq[(Long, Long)])
extends SparkListenerEvent
+/**
+ * Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
+ *
+ * This is necessary due to limitations in how Jackson's scala module deserializes primitives;
+ * see the "Deserializing Option[Int] and other primitive challenges" section in
+ * https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and
+ * SPARK-18462 for the specific problem that motivated this conversion.
+ */
+private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {
+
+ override def convert(in: (Object, Object)): (Long, Long) = {
+ def toLong(a: Object): Long = a match {
+ case i: java.lang.Integer => i.intValue()
+ case l: java.lang.Long => l.longValue()
+ }
+ (toLong(in._1), toLong(in._2))
+ }
+
+ override def getInputType(typeFactory: TypeFactory): JavaType = {
+ val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
+ typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
+ }
+
+ override def getOutputType(typeFactory: TypeFactory): JavaType = {
+ val longType = typeFactory.uncheckedSimpleType(classOf[Long])
+ typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
+ }
+}
+
class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 948a155457..8aea112897 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
+import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.mock
import org.apache.spark._
@@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
-class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
+class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
import org.apache.spark.AccumulatorSuite.makeInfo
@@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
}
+ test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
+ val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
+ val json = JsonProtocol.sparkEventToJson(event)
+ assertValidDataInJson(json,
+ parse("""
+ |{
+ | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+ | "executionId": 1,
+ | "accumUpdates": [[2,3]]
+ |}
+ """.stripMargin))
+ JsonProtocol.sparkEventFromJson(json) match {
+ case SparkListenerDriverAccumUpdates(executionId, accums) =>
+ assert(executionId == 1L)
+ accums.foreach { case (a, b) =>
+ assert(a == 2L)
+ assert(b == 3L)
+ }
+ }
+
+ // Test a case where the numbers in the JSON can only fit in longs:
+ val longJson = parse(
+ """
+ |{
+ | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+ | "executionId": 4294967294,
+ | "accumUpdates": [[4294967294,3]]
+ |}
+ """.stripMargin)
+ JsonProtocol.sparkEventFromJson(longJson) match {
+ case SparkListenerDriverAccumUpdates(executionId, accums) =>
+ assert(executionId == 4294967294L)
+ accums.foreach { case (a, b) =>
+ assert(a == 4294967294L)
+ assert(b == 3L)
+ }
+ }
+ }
+
}