diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2017-02-22 12:42:23 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-02-22 12:42:23 -0800 |
commit | 37112fcfcd64db8f84f437e5c54cc3ea039c68f6 (patch) | |
tree | 10daabc020ac169fa7fdd827aa38c0d2c49ea396 /sql/core | |
parent | 1f86e795b87ba93640062f29e87a032924d94b2a (diff) | |
download | spark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.tar.gz spark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.tar.bz2 spark-37112fcfcd64db8f84f437e5c54cc3ea039c68f6.zip |
[SPARK-19666][SQL] Skip a property without getter in Java schema inference and allow empty bean in encoder creation
## What changes were proposed in this pull request?
This PR proposes to fix two.
**Skip a property without a getter in beans**
Currently, if we use a JavaBean without the getter as below:
```java
public static class BeanWithoutGetter implements Serializable {
private String a;
public void setA(String a) {
this.a = a;
}
}
BeanWithoutGetter bean = new BeanWithoutGetter();
List<BeanWithoutGetter> data = Arrays.asList(bean);
spark.createDataFrame(data, BeanWithoutGetter.class).show();
```
- Before
It throws an exception as below:
```
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
```
- After
```
++
||
++
||
++
```
**Supports empty bean in encoder creation**
```java
public static class EmptyBean implements Serializable {}
EmptyBean bean = new EmptyBean();
List<EmptyBean> data = Arrays.asList(bean);
spark.createDataset(data, Encoders.bean(EmptyBean.class)).show();
```
- Before
throws an exception as below:
```
java.lang.UnsupportedOperationException: Cannot infer type for class EmptyBean because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:436)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:341)
```
- After
```
++
||
++
||
++
```
## How was this patch tested?
Unit test in `JavaDataFrameSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #17013 from HyukjinKwon/SPARK-19666.
Diffstat (limited to 'sql/core')
4 files changed, 33 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index dbe55090ea..234ef2dffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1090,14 +1090,14 @@ object SQLContext { */ private[sql] def beansToRows( data: Iterator[_], - beanInfo: BeanInfo, + beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = - beanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) + JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) } - data.map{ element => + data.map { element => new GenericInternalRow( methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) } ): InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 72af55c1fa..afc1827e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import java.beans.Introspector import java.io.Closeable import java.util.concurrent.atomic.AtomicReference @@ -347,8 +346,7 @@ class SparkSession private( val className = beanClass.getName val rowRdd = rdd.mapPartitions { iter => // BeanInfo is not serializable so we must rediscover it remotely for each partition. - val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) - SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) + SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq) } Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self)) } @@ -374,8 +372,7 @@ class SparkSession private( */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = { val attrSeq = getSchema(beanClass) - val beanInfo = Introspector.getBeanInfo(beanClass) - val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq) + val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq) Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq)) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index c3b94a44c2..a8f814bfae 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -397,4 +397,21 @@ public class JavaDataFrameSuite { Assert.assertTrue(filter4.mightContain(i * 3)); } } + + public static class BeanWithoutGetter implements Serializable { + private String a; + + public void setA(String a) { + this.a = a; + } + } + + @Test + public void testBeanWithoutGetter() { + BeanWithoutGetter bean = new BeanWithoutGetter(); + List<BeanWithoutGetter> data = Arrays.asList(bean); + Dataset<Row> df = spark.createDataFrame(data, BeanWithoutGetter.class); + Assert.assertEquals(df.schema().length(), 0); + Assert.assertEquals(df.collectAsList().size(), 1); + } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 577672ca8e..4581c6ebe9 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -1276,4 +1276,15 @@ public class JavaDatasetSuite implements Serializable { spark.createDataset(data, Encoders.bean(NestedComplicatedJavaBean.class)); ds.collectAsList(); } + + public static class EmptyBean implements Serializable {} + + @Test + public void testEmptyBean() { + EmptyBean bean = new EmptyBean(); + List<EmptyBean> data = Arrays.asList(bean); + Dataset<EmptyBean> df = spark.createDataset(data, Encoders.bean(EmptyBean.class)); + Assert.assertEquals(df.schema().length(), 0); + Assert.assertEquals(df.collectAsList().size(), 1); + } } |