aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-12-16 21:16:03 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-16 21:16:03 -0800
commit3b395e10510782474789c9098084503f98ca4830 (patch)
tree6a3193af08fb8c82f8f8f6cab6fc2a5ed5c0fbc3 /sql
parentb85044ecfa825ff68c8e57eeffa4d9f214335e66 (diff)
downloadspark-3b395e10510782474789c9098084503f98ca4830.tar.gz
spark-3b395e10510782474789c9098084503f98ca4830.tar.bz2
spark-3b395e10510782474789c9098084503f98ca4830.zip
[SPARK-4798][SQL] A new set of Parquet testing API and test suites
This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled: - `ParquetQuerySuite` - `ParquetTestData` <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3644) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3644 from liancheng/parquet-tests and squashes the following commits: 800e745 [Cheng Lian] Enforces ordering of test output 3bb8731 [Cheng Lian] Refactors HiveParquetSuite aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext 7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc 7f07af0 [Cheng Lian] Adds a new set of Parquet test suites
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala127
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala253
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala287
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala110
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala164
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala119
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala6
8 files changed, 989 insertions, 81 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f5abf71d6c..f5bf935522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -188,6 +188,10 @@ private[sql] trait SQLConf {
*/
def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }
+ private[spark] def unsetConf(key: String) {
+ settings -= key
+ }
+
private[spark] def clear() {
settings.clear()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
new file mode 100644
index 0000000000..b4d48902fd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+import scala.util.Try
+
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+/**
+ * A helper trait that provides convenient facilities for Parquet testing.
+ *
+ * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more
+ * convenient to use tuples rather than special case classes when writing test cases/suites.
+ * Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
+ */
+trait ParquetTest {
+ val sqlContext: SQLContext
+
+ import sqlContext._
+
+ protected def configuration = sparkContext.hadoopConfiguration
+
+ /**
+ * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
+ * configurations.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map(key => Try(getConf(key)).toOption)
+ (keys, values).zipped.foreach(setConf)
+ try f finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => setConf(key, value)
+ case (key, None) => unsetConf(key)
+ }
+ }
+ }
+
+ /**
+ * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
+ * a file/directory is created there by `f`, it will be delete after `f` returns.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withTempPath(f: File => Unit): Unit = {
+ val file = util.getTempFilePath("parquetTest").getCanonicalFile
+ try f(file) finally if (file.exists()) Utils.deleteRecursively(file)
+ }
+
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withTempDir(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir().getCanonicalFile
+ try f(dir) finally Utils.deleteRecursively(dir)
+ }
+
+ /**
+ * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ */
+ protected def withParquetFile[T <: Product: ClassTag: TypeTag]
+ (data: Seq[T])
+ (f: String => Unit): Unit = {
+ withTempPath { file =>
+ sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath)
+ f(file.getCanonicalPath)
+ }
+ }
+
+ /**
+ * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`.
+ * The Parquet file will be deleted after `f` returns.
+ */
+ protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
+ (data: Seq[T])
+ (f: SchemaRDD => Unit): Unit = {
+ withParquetFile(data)(path => f(parquetFile(path)))
+ }
+
+ /**
+ * Drops temporary table `tableName` after calling `f`.
+ */
+ protected def withTempTable(tableName: String)(f: => Unit): Unit = {
+ try f finally dropTempTable(tableName)
+ }
+
+ /**
+ * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary
+ * table named `tableName`, then call `f`. The temporary table together with the Parquet file will
+ * be dropped/deleted after `f` returns.
+ */
+ protected def withParquetTable[T <: Product: ClassTag: TypeTag]
+ (data: Seq[T], tableName: String)
+ (f: => Unit): Unit = {
+ withParquetRDD(data) { rdd =>
+ rdd.registerTempTable(tableName)
+ withTempTable(tableName)(f)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
new file mode 100644
index 0000000000..111a459e6d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import parquet.filter2.predicate.Operators._
+import parquet.filter2.predicate.{FilterPredicate, Operators}
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
+
+/**
+ * A test suite that tests Parquet filter2 API based filter pushdown optimization.
+ *
+ * Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the
+ * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
+ * results a `GtEq` filter predicate rather than a `Not`.
+ *
+ * @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367
+ */
+class ParquetFilterSuite extends QueryTest with ParquetTest {
+ val sqlContext = TestSQLContext
+
+ private def checkFilterPushdown(
+ rdd: SchemaRDD,
+ output: Seq[Symbol],
+ predicate: Predicate,
+ filterClass: Class[_ <: FilterPredicate],
+ checker: (SchemaRDD, Any) => Unit,
+ expectedResult: => Any): Unit = {
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ val query = rdd.select(output.map(_.attr): _*).where(predicate)
+
+ val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
+ case plan: ParquetTableScan => plan.columnPruningPred
+ }.flatten.reduceOption(_ && _)
+
+ assert(maybeAnalyzedPredicate.isDefined)
+ maybeAnalyzedPredicate.foreach { pred =>
+ val maybeFilter = ParquetFilters.createFilter(pred)
+ assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
+ maybeFilter.foreach(f => assert(f.getClass === filterClass))
+ }
+
+ checker(query, expectedResult)
+ }
+ }
+
+ private def checkFilterPushdown
+ (rdd: SchemaRDD, output: Symbol*)
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
+ (expectedResult: => Any): Unit = {
+ checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult)
+ }
+
+ def checkBinaryFilterPushdown
+ (rdd: SchemaRDD, output: Symbol*)
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
+ (expectedResult: => Any): Unit = {
+ def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
+ val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
+ val expected = result match {
+ case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
+ case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
+ }
+ assert(actual.sorted === expected.sorted)
+ }
+ checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
+ }
+
+ test("filter pushdown - boolean") {
+ withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true)
+ checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false)
+ }
+ }
+
+ test("filter pushdown - integer") {
+ withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+ (2 to 4).map(Row.apply(_))
+ }
+
+ checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
+
+ checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4)
+ checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4)
+
+ checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+ checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+ Seq(Row(1), Row(4))
+ }
+ }
+ }
+
+ test("filter pushdown - long") {
+ withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+ (2 to 4).map(Row.apply(_))
+ }
+
+ checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
+
+ checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4)
+ checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4)
+
+ checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+ checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+ Seq(Row(1), Row(4))
+ }
+ }
+ }
+
+ test("filter pushdown - float") {
+ withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+ (2 to 4).map(Row.apply(_))
+ }
+
+ checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
+
+ checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4)
+ checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4)
+
+ checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+ checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+ Seq(Row(1), Row(4))
+ }
+ }
+ }
+
+ test("filter pushdown - double") {
+ withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+ (2 to 4).map(Row.apply(_))
+ }
+
+ checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
+ checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
+
+ checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4)
+ checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1)
+ checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4)
+
+ checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
+ checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+ checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+ Seq(Row(1), Row(4))
+ }
+ }
+ }
+
+ test("filter pushdown - string") {
+ withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd =>
+ checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1")
+ checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) {
+ (2 to 4).map(i => Row.apply(i.toString))
+ }
+
+ checkFilterPushdown(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])("1")
+ checkFilterPushdown(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])("4")
+ checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1")
+ checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4")
+
+ checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])("1")
+ checkFilterPushdown(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])("1")
+ checkFilterPushdown(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])("4")
+ checkFilterPushdown(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])("1")
+ checkFilterPushdown(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])("4")
+
+ checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4")
+ checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3")
+ checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
+ Seq(Row("1"), Row("4"))
+ }
+ }
+ }
+
+ test("filter pushdown - binary") {
+ implicit class IntToBinary(int: Int) {
+ def b: Array[Byte] = int.toString.getBytes("UTF-8")
+ }
+
+ withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd =>
+ checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) {
+ (2 to 4).map(i => Row.apply(i.b)).toSeq
+ }
+
+ checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
+
+ checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b)
+ checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b)
+ checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b)
+
+ checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
+ checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
+ Seq(Row(1.b), Row(4.b))
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
new file mode 100644
index 0000000000..10a01474e9
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import parquet.example.data.simple.SimpleGroup
+import parquet.example.data.{Group, GroupWriter}
+import parquet.hadoop.api.WriteSupport
+import parquet.hadoop.api.WriteSupport.WriteContext
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.io.api.RecordConsumer
+import parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types.DecimalType
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
+
+// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
+// with an empty configuration (it is after all not intended to be used in this way?)
+// and members are private so we need to make our own in order to pass the schema
+// to the writer.
+private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
+ var groupWriter: GroupWriter = null
+
+ override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+ groupWriter = new GroupWriter(recordConsumer, schema)
+ }
+
+ override def init(configuration: Configuration): WriteContext = {
+ new WriteContext(schema, new java.util.HashMap[String, String]())
+ }
+
+ override def write(record: Group) {
+ groupWriter.write(record)
+ }
+}
+
+/**
+ * A test suite that tests basic Parquet I/O.
+ */
+class ParquetIOSuite extends QueryTest with ParquetTest {
+ val sqlContext = TestSQLContext
+
+ /**
+ * Writes `data` to a Parquet file, reads it back and check file contents.
+ */
+ protected def checkParquetFile[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
+ withParquetRDD(data)(checkAnswer(_, data))
+ }
+
+ test("basic data types (without binary)") {
+ val data = (1 to 4).map { i =>
+ (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+ }
+ checkParquetFile(data)
+ }
+
+ test("raw binary") {
+ val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
+ withParquetRDD(data) { rdd =>
+ assertResult(data.map(_._1.mkString(",")).sorted) {
+ rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+ }
+ }
+ }
+
+ test("string") {
+ val data = (1 to 4).map(i => Tuple1(i.toString))
+ // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
+ // as we store Spark SQL schema in the extra metadata.
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
+ }
+
+ test("fixed-length decimals") {
+ def makeDecimalRDD(decimal: DecimalType): SchemaRDD =
+ sparkContext
+ .parallelize(0 to 1000)
+ .map(i => Tuple1(i / 100.0))
+ .select('_1 cast decimal)
+
+ for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
+ withTempPath { dir =>
+ val data = makeDecimalRDD(DecimalType(precision, scale))
+ data.saveAsParquetFile(dir.getCanonicalPath)
+ checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+ }
+ }
+
+ // Decimals with precision above 18 are not yet supported
+ intercept[RuntimeException] {
+ withTempPath { dir =>
+ makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).collect()
+ }
+ }
+
+ // Unlimited-length decimals are not yet supported
+ intercept[RuntimeException] {
+ withTempPath { dir =>
+ makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).collect()
+ }
+ }
+ }
+
+ test("map") {
+ val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
+ checkParquetFile(data)
+ }
+
+ test("array") {
+ val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
+ checkParquetFile(data)
+ }
+
+ test("struct") {
+ val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
+ withParquetRDD(data) { rdd =>
+ // Structs are converted to `Row`s
+ checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ Tuple1(Row(struct.productIterator.toSeq: _*))
+ })
+ }
+ }
+
+ test("nested struct with array of array as field") {
+ val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
+ withParquetRDD(data) { rdd =>
+ // Structs are converted to `Row`s
+ checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ Tuple1(Row(struct.productIterator.toSeq: _*))
+ })
+ }
+ }
+
+ test("nested map with struct as value type") {
+ val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
+ withParquetRDD(data) { rdd =>
+ checkAnswer(rdd, data.map { case Tuple1(m) =>
+ Tuple1(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+ })
+ }
+ }
+
+ test("nulls") {
+ val allNulls = (
+ null.asInstanceOf[java.lang.Boolean],
+ null.asInstanceOf[Integer],
+ null.asInstanceOf[java.lang.Long],
+ null.asInstanceOf[java.lang.Float],
+ null.asInstanceOf[java.lang.Double])
+
+ withParquetRDD(allNulls :: Nil) { rdd =>
+ val rows = rdd.collect()
+ assert(rows.size === 1)
+ assert(rows.head === Row(Seq.fill(5)(null): _*))
+ }
+ }
+
+ test("nones") {
+ val allNones = (
+ None.asInstanceOf[Option[Int]],
+ None.asInstanceOf[Option[Long]],
+ None.asInstanceOf[Option[String]])
+
+ withParquetRDD(allNones :: Nil) { rdd =>
+ val rows = rdd.collect()
+ assert(rows.size === 1)
+ assert(rows.head === Row(Seq.fill(3)(null): _*))
+ }
+ }
+
+ test("compression codec") {
+ def compressionCodecFor(path: String) = {
+ val codecs = ParquetTypesConverter
+ .readMetaData(new Path(path), Some(configuration))
+ .getBlocks
+ .flatMap(_.getColumns)
+ .map(_.getCodec.name())
+ .distinct
+
+ assert(codecs.size === 1)
+ codecs.head
+ }
+
+ val data = (0 until 10).map(i => (i, i.toString))
+
+ def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+ withParquetFile(data) { path =>
+ assertResult(parquetCompressionCodec.toUpperCase) {
+ compressionCodecFor(path)
+ }
+ }
+ }
+ }
+
+ // Checks default compression codec
+ checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec))
+
+ checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+ checkCompressionCodec(CompressionCodecName.GZIP)
+ checkCompressionCodec(CompressionCodecName.SNAPPY)
+ }
+
+ test("read raw Parquet file") {
+ def makeRawParquetFile(path: Path): Unit = {
+ val schema = MessageTypeParser.parseMessageType(
+ """
+ |message root {
+ | required boolean _1;
+ | required int32 _2;
+ | required int64 _3;
+ | required float _4;
+ | required double _5;
+ |}
+ """.stripMargin)
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+
+ (0 until 10).foreach { i =>
+ val record = new SimpleGroup(schema)
+ record.add(0, i % 2 == 0)
+ record.add(1, i)
+ record.add(2, i.toLong)
+ record.add(3, i.toFloat)
+ record.add(4, i.toDouble)
+ writer.write(record)
+ }
+
+ writer.close()
+ }
+
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+ makeRawParquetFile(path)
+ checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+ (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+ })
+ }
+ }
+
+ test("write metadata") {
+ withTempPath { file =>
+ val path = new Path(file.toURI.toString)
+ val fs = FileSystem.getLocal(configuration)
+ val attributes = ScalaReflection.attributesFor[(Int, String)]
+ ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+
+ assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
+ assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
+
+ val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
+ val actualSchema = metaData.getFileMetaData.getSchema
+ val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+
+ actualSchema.checkContains(expectedSchema)
+ expectedSchema.checkContains(actualSchema)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
new file mode 100644
index 0000000000..daa7ca65cd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
+
+/**
+ * A test suite that tests various Parquet queries.
+ */
+class ParquetQuerySuite2 extends QueryTest with ParquetTest {
+ val sqlContext = TestSQLContext
+
+ test("simple projection") {
+ withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+ checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+ }
+ }
+
+ test("insertion") {
+ withTempDir { dir =>
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ createParquetFile[(Int, String)](dir.toString).registerTempTable("dest")
+ withTempTable("dest") {
+ sql("INSERT OVERWRITE INTO dest SELECT * FROM t")
+ checkAnswer(table("dest"), data)
+ }
+ }
+ }
+ }
+
+ test("appending") {
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ sql("INSERT INTO t SELECT * FROM t")
+ checkAnswer(table("t"), data ++ data)
+ }
+ }
+
+ test("self-join") {
+ // 4 rows, cells of column 1 of row 2 and row 4 are null
+ val data = (1 to 4).map { i =>
+ val maybeInt = if (i % 2 == 0) None else Some(i)
+ (maybeInt, i.toString)
+ }
+
+ withParquetTable(data, "t") {
+ val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+ val queryOutput = selfJoin.queryExecution.analyzed.output
+
+ assertResult(4, s"Field count mismatches")(queryOutput.size)
+ assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
+ queryOutput.filter(_.name == "_1").map(_.exprId).size
+ }
+
+ checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+ }
+ }
+
+ test("nested data - struct with array field") {
+ val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+ case Tuple1((_, Seq(string))) => Row(string)
+ })
+ }
+ }
+
+ test("nested data - array of struct") {
+ val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+ case Tuple1(Seq((_, string))) => Row(string)
+ })
+ }
+ }
+
+ test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
+ withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+ checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
+ }
+ }
+
+ test("SPARK-3536 regression: query empty Parquet file shouldn't throw") {
+ withTempDir { dir =>
+ createParquetFile[(Int, String)](dir.toString).registerTempTable("t")
+ withTempTable("t") {
+ checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
new file mode 100644
index 0000000000..34d61bf908
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.scalatest.FunSuite
+import parquet.schema.MessageTypeParser
+
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.types.{BinaryType, IntegerType, StructField, StructType}
+import org.apache.spark.sql.test.TestSQLContext
+
+class ParquetSchemaSuite extends FunSuite with ParquetTest {
+ val sqlContext = TestSQLContext
+
+ /**
+ * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
+ */
+ private def testSchema[T <: Product: ClassTag: TypeTag](
+ testName: String, messageType: String): Unit = {
+ test(testName) {
+ val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
+ val expected = MessageTypeParser.parseMessageType(messageType)
+ actual.checkContains(expected)
+ expected.checkContains(actual)
+ }
+ }
+
+ testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])](
+ "basic types",
+ """
+ |message root {
+ | required boolean _1;
+ | required int32 _2;
+ | required int64 _3;
+ | required float _4;
+ | required double _5;
+ | optional binary _6;
+ |}
+ """.stripMargin)
+
+ testSchema[(Byte, Short, Int, Long)](
+ "logical integral types",
+ """
+ |message root {
+ | required int32 _1 (INT_8);
+ | required int32 _2 (INT_16);
+ | required int32 _3 (INT_32);
+ | required int64 _4 (INT_64);
+ |}
+ """.stripMargin)
+
+ // Currently String is the only supported logical binary type.
+ testSchema[Tuple1[String]](
+ "binary logical types",
+ """
+ |message root {
+ | optional binary _1 (UTF8);
+ |}
+ """.stripMargin)
+
+ testSchema[Tuple1[Seq[Int]]](
+ "array",
+ """
+ |message root {
+ | optional group _1 (LIST) {
+ | repeated int32 array;
+ | }
+ |}
+ """.stripMargin)
+
+ testSchema[Tuple1[Map[Int, String]]](
+ "map",
+ """
+ |message root {
+ | optional group _1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchema[Tuple1[Pair[Int, String]]](
+ "struct",
+ """
+ |message root {
+ | optional group _1 {
+ | required int32 _1;
+ | optional binary _2 (UTF8);
+ | }
+ |}
+ """.stripMargin)
+
+ testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+ "deeply nested type",
+ """
+ |message root {
+ | optional group _1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | optional group value {
+ | optional binary _1 (UTF8);
+ | optional group _2 (LIST) {
+ | repeated group bag {
+ | optional group array {
+ | required int32 _1;
+ | required double _2;
+ | }
+ | }
+ | }
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchema[(Option[Int], Map[Int, Option[Double]])](
+ "optional types",
+ """
+ |message root {
+ | optional int32 _1;
+ | optional group _2 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | optional double value;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ test("DataType string parser compatibility") {
+ val schema = StructType(List(
+ StructField("c1", IntegerType, false),
+ StructField("c2", BinaryType, true)))
+
+ val fromCaseClassString = ParquetTypesConverter.convertFromString(schema.toString)
+ val fromJson = ParquetTypesConverter.convertFromString(schema.json)
+
+ (fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
+ assert(a.name == b.name)
+ assert(a.dataType === b.dataType)
+ assert(a.nullable === b.nullable)
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 6f57fe8958..4bc14bad0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,103 +17,66 @@
package org.apache.spark.sql.parquet
-import java.io.File
-
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
-import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
-import org.apache.spark.sql.{parquet, SchemaRDD}
-import org.apache.spark.util.Utils
-
-// Implicits
-import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.hive.test.TestHive
case class Cases(lower: String, UPPER: String)
-class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
-
- val dirname = Utils.createTempDir()
-
- var testRDD: SchemaRDD = null
-
- override def beforeAll() {
- // write test data
- ParquetTestData.writeFile()
- testRDD = parquetFile(ParquetTestData.testDir.toString)
- testRDD.registerTempTable("testsource")
- }
-
- override def afterAll() {
- Utils.deleteRecursively(ParquetTestData.testDir)
- Utils.deleteRecursively(dirname)
- reset() // drop all tables that were registered as part of the tests
- }
-
- // in case tests are failing we delete before and after each test
- override def beforeEach() {
- Utils.deleteRecursively(dirname)
- }
+class HiveParquetSuite extends QueryTest with ParquetTest {
+ val sqlContext = TestHive
- override def afterEach() {
- Utils.deleteRecursively(dirname)
- }
+ import sqlContext._
test("Case insensitive attribute names") {
- val tempFile = File.createTempFile("parquet", "")
- tempFile.delete()
- sparkContext.parallelize(1 to 10)
- .map(_.toString)
- .map(i => Cases(i, i))
- .saveAsParquetFile(tempFile.getCanonicalPath)
-
- parquetFile(tempFile.getCanonicalPath).registerTempTable("cases")
- sql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
- sql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
+ val expected = (1 to 4).map(i => Row(i.toString))
+ checkAnswer(sql("SELECT upper FROM cases"), expected)
+ checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+ }
}
test("SELECT on Parquet table") {
- val rdd = sql("SELECT * FROM testsource").collect()
- assert(rdd != null)
- assert(rdd.forall(_.size == 6))
+ val data = (1 to 4).map(i => (i, s"val_$i"))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT * FROM t"), data)
+ }
}
test("Simple column projection + filter on Parquet table") {
- val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
- assert(rdd.size === 5, "Filter returned incorrect number of rows")
- assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
+ withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+ checkAnswer(
+ sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+ Seq(Row(true, "val_2"), Row(true, "val_4")))
+ }
}
test("Converting Hive to Parquet Table via saveAsParquetFile") {
- sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
- parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
- val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
- val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
-
- compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
+ withTempPath { dir =>
+ sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ checkAnswer(
+ sql("SELECT * FROM src ORDER BY key"),
+ sql("SELECT * from p ORDER BY key").collect().toSeq)
+ }
+ }
}
- test("INSERT OVERWRITE TABLE Parquet table") {
- sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
- parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
- // let's do three overwrites for good measure
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- val rddCopy = sql("SELECT * FROM ptable").collect()
- val rddOrig = sql("SELECT * FROM testsource").collect()
- assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
- compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
- }
- private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
- var counter = 0
- (rddOne, rddTwo).zipped.foreach {
- (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
- case ((value_1, value_2), index) =>
- assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
+ test("INSERT OVERWRITE TABLE Parquet table") {
+ withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") {
+ withTempPath { file =>
+ sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath)
+ parquetFile(file.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
+ }
}
- counter = counter + 1
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 488ebba043..fc0e42c201 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
-class ParquetMetastoreSuite extends ParquetTest {
+class ParquetMetastoreSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest {
/**
* A suite of tests for the Parquet support through the data sources API.
*/
-class ParquetSourceSuite extends ParquetTest {
+class ParquetSourceSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest {
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
-abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll {
var partitionedTableDir: File = null
var partitionedTableDirWithKey: File = null