aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-02-22 12:42:23 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-22 12:42:23 -0800
commit37112fcfcd64db8f84f437e5c54cc3ea039c68f6 (patch)
tree10daabc020ac169fa7fdd827aa38c0d2c49ea396 /sql/core/src/main/scala/org
parent1f86e795b87ba93640062f29e87a032924d94b2a (diff)
downloadspark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.tar.gz
spark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.tar.bz2
spark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.zip
[SPARK-19666][SQL] Skip a property without getter in Java schema inference and allow empty bean in encoder creation
## What changes were proposed in this pull request? This PR proposes to fix two. **Skip a property without a getter in beans** Currently, if we use a JavaBean without the getter as below: ```java public static class BeanWithoutGetter implements Serializable { private String a; public void setA(String a) { this.a = a; } } BeanWithoutGetter bean = new BeanWithoutGetter(); List<BeanWithoutGetter> data = Arrays.asList(bean); spark.createDataFrame(data, BeanWithoutGetter.class).show(); ``` - Before It throws an exception as below: ``` java.lang.NullPointerException at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465) at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126) at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125) ``` - After ``` ++ || ++ || ++ ``` **Supports empty bean in encoder creation** ```java public static class EmptyBean implements Serializable {} EmptyBean bean = new EmptyBean(); List<EmptyBean> data = Arrays.asList(bean); spark.createDataset(data, Encoders.bean(EmptyBean.class)).show(); ``` - Before throws an exception as below: ``` java.lang.UnsupportedOperationException: Cannot infer type for class EmptyBean because it is not bean-compliant at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:436) at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:341) ``` - After ``` ++ || ++ || ++ ``` ## How was this patch tested? Unit test in `JavaDataFrameSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17013 from HyukjinKwon/SPARK-19666.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala7
2 files changed, 5 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index dbe55090ea..234ef2dffc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1090,14 +1090,14 @@ object SQLContext {
*/
private[sql] def beansToRows(
data: Iterator[_],
- beanInfo: BeanInfo,
+ beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
val extractors =
- beanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
+ JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
}
- data.map{ element =>
+ data.map { element =>
new GenericInternalRow(
methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) }
): InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 72af55c1fa..afc1827e7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql
-import java.beans.Introspector
import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
@@ -347,8 +346,7 @@ class SparkSession private(
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
- val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
- SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
+ SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
}
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
}
@@ -374,8 +372,7 @@ class SparkSession private(
*/
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = {
val attrSeq = getSchema(beanClass)
- val beanInfo = Introspector.getBeanInfo(beanClass)
- val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
+ val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq)
Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq))
}