aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-11-21 13:23:32 -0800
committerReynold Xin <rxin@databricks.com>2016-11-21 13:23:32 -0800
commita2d464770cd183daa7d727bf377bde9c21e29e6a (patch)
tree9268ffe990937be39519112f3fc21c2c70fef6cf /sql
parentddd02f50bb7458410d65427321efc75da5e65224 (diff)
downloadspark-a2d464770cd183daa7d727bf377bde9c21e29e6a.tar.gz
spark-a2d464770cd183daa7d727bf377bde9c21e29e6a.tar.bz2
spark-a2d464770cd183daa7d727bf377bde9c21e29e6a.zip
[SPARK-17765][SQL] Support for writing out user-defined type in ORC datasource
## What changes were proposed in this pull request? This PR adds the support for `UserDefinedType` when writing out instead of throwing `ClassCastException` in ORC data source. In more details, `OrcStruct` is being created based on string from`DataType.catalogString`. For user-defined type, it seems it returns `sqlType.simpleString` for `catalogString` by default[1]. However, during type-dispatching to match the output with the schema, it tries to cast to, for example, `StructType`[2]. So, running the codes below (`MyDenseVector` was borrowed[3]) : ``` scala val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))) val udtDF = data.toDF("id", "vectors") udtDF.write.orc("/tmp/test.orc") ``` ends up throwing an exception as below: ``` java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot be cast to org.apache.spark.sql.types.ArrayType at org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381) at org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164) ... ``` So, this PR uses `UserDefinedType.sqlType` during finding the correct converter when writing out in ORC data source. [1]https://github.com/apache/spark/blob/dfdcab00c7b6200c22883baa3ebc5818be09556f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L95 [2]https://github.com/apache/spark/blob/d2dc8c4a162834818190ffd82894522c524ca3e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L326 [3]https://github.com/apache/spark/blob/2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala#L38-L70 ## How was this patch tested? Unit tests in `OrcQuerySuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15361 from HyukjinKwon/SPARK-17765.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala10
2 files changed, 13 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index e303065127..52aa1088ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -246,6 +246,9 @@ private[hive] trait HiveInspectors {
* Wraps with Hive types based on object inspector.
*/
protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match {
+ case _ if dataType.isInstanceOf[UserDefinedType[_]] =>
+ val sqlType = dataType.asInstanceOf[UserDefinedType[_]].sqlType
+ wrapperFor(oi, sqlType)
case x: ConstantObjectInspector =>
(o: Any) =>
x.getWritableConstantValue
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index a628977af2..b8761e9de2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -93,6 +93,16 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
+ test("Read/write UserDefinedType") {
+ withTempPath { path =>
+ val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
+ val udtDF = data.toDF("id", "vectors")
+ udtDF.write.orc(path.getAbsolutePath)
+ val readBack = spark.read.schema(udtDF.schema).orc(path.getAbsolutePath)
+ checkAnswer(udtDF, readBack)
+ }
+ }
+
test("Creating case class RDD table") {
val data = (1 to 100).map(i => (i, s"val_$i"))
sparkContext.parallelize(data).toDF().createOrReplaceTempView("t")