aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Ash <andrew@andrewash.com>2014-05-12 19:23:39 -0700
committerReynold Xin <rxin@apache.org>2014-05-12 19:23:39 -0700
commit156df87e7ca0e6cda2cc970ecd1466ce06f7576f (patch)
tree4fd4a1fd67b62d73543b68f058d42e5a3e46e2da /sql
parent9cf9f18973840f7287f7cfa5ce90efed3225bb30 (diff)
downloadspark-156df87e7ca0e6cda2cc970ecd1466ce06f7576f.tar.gz
spark-156df87e7ca0e6cda2cc970ecd1466ce06f7576f.tar.bz2
spark-156df87e7ca0e6cda2cc970ecd1466ce06f7576f.zip
SPARK-1757 Failing test for saving null primitives with .saveAsParquetFile()
https://issues.apache.org/jira/browse/SPARK-1757 The first test succeeds, but the second test fails with exception: ``` [info] - save and load case class RDD with Nones as parquet *** FAILED *** (14 milliseconds) [info] java.lang.RuntimeException: Unsupported datatype StructType(List()) [info] at scala.sys.package$.error(package.scala:27) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetRelation.scala:201) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.immutable.List.foreach(List.scala:318) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:105) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetRelation.scala:234) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetRelation.scala:267) [info] at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:143) [info] at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:122) [info] at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:139) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:268) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:268) [info] at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:66) [info] at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:98) ``` Author: Andrew Ash <andrew@andrewash.com> Author: Michael Armbrust <michael@databricks.com> Closes #690 from ash211/rdd-parquet-save and squashes the following commits: 747a0b9 [Andrew Ash] Merge pull request #1 from marmbrus/pr/690 54bd00e [Michael Armbrust] Need to put Option first since Option <: Seq. 8f3f281 [Andrew Ash] SPARK-1757 Add failing test for saving SparkSQL Schemas with Option[?] fields as parquet
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala44
2 files changed, 47 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 792ef6cee6..196695a0a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -41,6 +41,9 @@ object ScalaReflection {
/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
+ case t if t <:< typeOf[Option[_]] =>
+ val TypeRef(_, _, Seq(optType)) = t
+ schemaFor(optType)
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
@@ -59,9 +62,6 @@ object ScalaReflection {
case t if t <:< typeOf[String] => StringType
case t if t <:< typeOf[Timestamp] => TimestampType
case t if t <:< typeOf[BigDecimal] => DecimalType
- case t if t <:< typeOf[Option[_]] =>
- val TypeRef(_, _, Seq(optType)) = t
- schemaFor(optType)
case t if t <:< typeOf[java.lang.Integer] => IntegerType
case t if t <:< typeOf[java.lang.Long] => LongType
case t if t <:< typeOf[java.lang.Double] => DoubleType
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index d9c9b9a076..ff1677eb8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -42,6 +42,20 @@ import org.apache.spark.sql.test.TestSQLContext._
case class TestRDDEntry(key: Int, value: String)
+case class NullReflectData(
+ intField: java.lang.Integer,
+ longField: java.lang.Long,
+ floatField: java.lang.Float,
+ doubleField: java.lang.Double,
+ booleanField: java.lang.Boolean)
+
+case class OptionalReflectData(
+ intField: Option[Int],
+ longField: Option[Long],
+ floatField: Option[Float],
+ doubleField: Option[Double],
+ booleanField: Option[Boolean])
+
class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
import TestData._
TestData // Load test data tables.
@@ -195,5 +209,35 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
}
+
+ test("save and load case class RDD with nulls as parquet") {
+ val data = NullReflectData(null, null, null, null, null)
+ val rdd = sparkContext.parallelize(data :: Nil)
+
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ rdd.saveAsParquetFile(path)
+ val readFile = parquetFile(path)
+
+ val rdd_saved = readFile.collect()
+ assert(rdd_saved(0) === Seq.fill(5)(null))
+ Utils.deleteRecursively(file)
+ assert(true)
+ }
+
+ test("save and load case class RDD with Nones as parquet") {
+ val data = OptionalReflectData(null, null, null, null, null)
+ val rdd = sparkContext.parallelize(data :: Nil)
+
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ rdd.saveAsParquetFile(path)
+ val readFile = parquetFile(path)
+
+ val rdd_saved = readFile.collect()
+ assert(rdd_saved(0) === Seq.fill(5)(null))
+ Utils.deleteRecursively(file)
+ assert(true)
+ }
}