aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-06-13 17:43:55 -0700
committerWenchen Fan <wenchen@databricks.com>2016-06-13 17:43:55 -0700
commit7b9071eeaa62fd9a51d9e94cfd479224b8341517 (patch)
tree8d1b27e06fd8f397b400330062daf53d856ec8c5
parenta6babca1bf76e70488ce6005ec3b8b53afc7edfd (diff)
downloadspark-7b9071eeaa62fd9a51d9e94cfd479224b8341517.tar.gz
spark-7b9071eeaa62fd9a51d9e94cfd479224b8341517.tar.bz2
spark-7b9071eeaa62fd9a51d9e94cfd479224b8341517.zip
[SPARK-15910][SQL] Check schema consistency when using Kryo encoder to convert DataFrame to Dataset
## What changes were proposed in this pull request? This PR enforces schema check when converting DataFrame to Dataset using Kryo encoder. For example. **Before the change:** Schema is NOT checked when converting DataFrame to Dataset using kryo encoder. ``` scala> case class B(b: Int) scala> implicit val encoder = Encoders.kryo[B] scala> val df = Seq((1)).toDF("b") scala> val ds = df.as[B] // Schema compatibility is NOT checked ``` **After the change:** Report AnalysisException since the schema is NOT compatible. ``` scala> val ds = Seq((1)).toDF("b").as[B] org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType; ... ``` ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #13632 from clockfly/spark-15910.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala9
2 files changed, 13 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 673c587b18..e72f67c48a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast}
import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer}
-import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types._
/**
@@ -209,7 +209,9 @@ object Encoders {
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
- GetColumnByOrdinal(0, BinaryType), classTag[T], kryo = useKryo),
+ Cast(GetColumnByOrdinal(0, BinaryType), BinaryType),
+ classTag[T],
+ kryo = useKryo),
clsTag = classTag[T]
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 96d85f12e8..f02a3141a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -453,6 +453,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
(KryoData(2), KryoData(2))))
}
+ test("Kryo encoder: check the schema mismatch when converting DataFrame to Dataset") {
+ implicit val kryoEncoder = Encoders.kryo[KryoData]
+ val df = Seq((1)).toDF("a")
+ val e = intercept[AnalysisException] {
+ df.as[KryoData]
+ }.message
+ assert(e.contains("cannot cast IntegerType to BinaryType"))
+ }
+
test("Java encoder") {
implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
val ds = Seq(JavaData(1), JavaData(2)).toDS()