aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorMichal Senkyr <mike.senkyr@gmail.com>2017-01-06 15:05:20 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-06 15:05:20 +0800
commit903bb8e8a2b84b9ea82acbb8ae9d58754862be3a (patch)
tree1df577fa49e4fd3400920234cc79865f40fbebdc /sql/catalyst/src/main
parentbcc510b021391035abe6d07c5b82bb0f0be31167 (diff)
downloadspark-903bb8e8a2b84b9ea82acbb8ae9d58754862be3a.tar.gz
spark-903bb8e8a2b84b9ea82acbb8ae9d58754862be3a.tar.bz2
spark-903bb8e8a2b84b9ea82acbb8ae9d58754862be3a.zip
[SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list)
## What changes were proposed in this pull request? Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`. Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed `ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered `SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Also manual execution of the following sets of commands in the Spark shell: ```scala case class TestCC(key: Int, letters: List[String]) val ds1 = sc.makeRDD(Seq( (List("D")), (List("S","H")), (List("F","H")), (List("D","L","L")) )).map(x=>(x.length,x)).toDF("key","letters").as[TestCC] val test1=ds1.map{_.key} test1.show ``` ```scala case class X(l: List[String]) spark.createDataset(Seq(List("A"))).map(X).show ``` ```scala spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect ``` After adding arbitrary sequence support also tested with the following commands: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <mike.senkyr@gmail.com> Closes #16240 from michalsenkyr/sql-caseclass-list-fix.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala40
1 files changed, 39 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index ad218cf88d..7f7dd51aa2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -312,12 +312,50 @@ object ScalaReflection extends ScalaReflection {
"array",
ObjectType(classOf[Array[Any]]))
- StaticInvoke(
+ val wrappedArray = StaticInvoke(
scala.collection.mutable.WrappedArray.getClass,
ObjectType(classOf[Seq[_]]),
"make",
array :: Nil)
+ if (localTypeOf[scala.collection.mutable.WrappedArray[_]] <:< t.erasure) {
+ wrappedArray
+ } else {
+ // Convert to another type using `to`
+ val cls = mirror.runtimeClass(t.typeSymbol.asClass)
+ import scala.collection.generic.CanBuildFrom
+ import scala.reflect.ClassTag
+
+ // Some canBuildFrom methods take an implicit ClassTag parameter
+ val cbfParams = try {
+ cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]])
+ StaticInvoke(
+ ClassTag.getClass,
+ ObjectType(classOf[ClassTag[_]]),
+ "apply",
+ StaticInvoke(
+ cls,
+ ObjectType(classOf[Class[_]]),
+ "getClass"
+ ) :: Nil
+ ) :: Nil
+ } catch {
+ case _: NoSuchMethodException => Nil
+ }
+
+ Invoke(
+ wrappedArray,
+ "to",
+ ObjectType(cls),
+ StaticInvoke(
+ cls,
+ ObjectType(classOf[CanBuildFrom[_, _, _]]),
+ "canBuildFrom",
+ cbfParams
+ ) :: Nil
+ )
+ }
+
case t if t <:< localTypeOf[Map[_, _]] =>
// TODO: add walked type path for map
val TypeRef(_, _, Seq(keyType, valueType)) = t