diff options
author | Cheng Lian <lian@databricks.com> | 2014-12-16 21:16:03 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-16 21:16:03 -0800 |
commit | 3b395e10510782474789c9098084503f98ca4830 (patch) | |
tree | 6a3193af08fb8c82f8f8f6cab6fc2a5ed5c0fbc3 /sql/hive | |
parent | b85044ecfa825ff68c8e57eeffa4d9f214335e66 (diff) | |
download | spark-3b395e10510782474789c9098084503f98ca4830.tar.gz spark-3b395e10510782474789c9098084503f98ca4830.tar.bz2 spark-3b395e10510782474789c9098084503f98ca4830.zip |
[SPARK-4798][SQL] A new set of Parquet testing API and test suites
This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled:
- `ParquetQuerySuite`
- `ParquetTestData`
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3644)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #3644 from liancheng/parquet-tests and squashes the following commits:
800e745 [Cheng Lian] Enforces ordering of test output
3bb8731 [Cheng Lian] Refactors HiveParquetSuite
aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext
7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc
7f07af0 [Cheng Lian] Adds a new set of Parquet test suites
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala | 119 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala | 6 |
2 files changed, 44 insertions, 81 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 6f57fe8958..4bc14bad0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,103 +17,66 @@ package org.apache.spark.sql.parquet -import java.io.File - -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} - -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} -import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType} -import org.apache.spark.sql.{parquet, SchemaRDD} -import org.apache.spark.util.Utils - -// Implicits -import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.hive.test.TestHive case class Cases(lower: String, UPPER: String) -class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { - - val dirname = Utils.createTempDir() - - var testRDD: SchemaRDD = null - - override def beforeAll() { - // write test data - ParquetTestData.writeFile() - testRDD = parquetFile(ParquetTestData.testDir.toString) - testRDD.registerTempTable("testsource") - } - - override def afterAll() { - Utils.deleteRecursively(ParquetTestData.testDir) - Utils.deleteRecursively(dirname) - reset() // drop all tables that were registered as part of the tests - } - - // in case tests are failing we delete before and after each test - override def beforeEach() { - Utils.deleteRecursively(dirname) - } +class HiveParquetSuite extends QueryTest with ParquetTest { + val sqlContext = TestHive - override def afterEach() { - Utils.deleteRecursively(dirname) - } + import sqlContext._ test("Case insensitive attribute names") { - val tempFile = File.createTempFile("parquet", "") - tempFile.delete() - sparkContext.parallelize(1 to 10) - .map(_.toString) - .map(i => Cases(i, i)) - .saveAsParquetFile(tempFile.getCanonicalPath) - - parquetFile(tempFile.getCanonicalPath).registerTempTable("cases") - sql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) - sql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { + val expected = (1 to 4).map(i => Row(i.toString)) + checkAnswer(sql("SELECT upper FROM cases"), expected) + checkAnswer(sql("SELECT LOWER FROM cases"), expected) + } } test("SELECT on Parquet table") { - val rdd = sql("SELECT * FROM testsource").collect() - assert(rdd != null) - assert(rdd.forall(_.size == 6)) + val data = (1 to 4).map(i => (i, s"val_$i")) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT * FROM t"), data) + } } test("Simple column projection + filter on Parquet table") { - val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() - assert(rdd.size === 5, "Filter returned incorrect number of rows") - assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value") + withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") { + checkAnswer( + sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"), + Seq(Row(true, "val_2"), Row(true, "val_4"))) + } } test("Converting Hive to Parquet Table via saveAsParquetFile") { - sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) - parquetFile(dirname.getAbsolutePath).registerTempTable("ptable") - val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0)) - val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0)) - - compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String")) + withTempPath { dir => + sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + checkAnswer( + sql("SELECT * FROM src ORDER BY key"), + sql("SELECT * from p ORDER BY key").collect().toSeq) + } + } } - test("INSERT OVERWRITE TABLE Parquet table") { - sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) - parquetFile(dirname.getAbsolutePath).registerTempTable("ptable") - // let's do three overwrites for good measure - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - val rddCopy = sql("SELECT * FROM ptable").collect() - val rddOrig = sql("SELECT * FROM testsource").collect() - assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??") - compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) - } - private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { - var counter = 0 - (rddOne, rddTwo).zipped.foreach { - (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach { - case ((value_1, value_2), index) => - assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match") + test("INSERT OVERWRITE TABLE Parquet table") { + withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") { + withTempPath { file => + sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath) + parquetFile(file.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + // let's do three overwrites for good measure + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq) + } } - counter = counter + 1 } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 488ebba043..fc0e42c201 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) * A suite to test the automatic conversion of metastore tables with parquet data to use the * built in parquet support. */ -class ParquetMetastoreSuite extends ParquetTest { +class ParquetMetastoreSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest { /** * A suite of tests for the Parquet support through the data sources API. */ -class ParquetSourceSuite extends ParquetTest { +class ParquetSourceSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest { /** * A collection of tests for parquet data with various forms of partitioning. */ -abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { +abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null var partitionedTableDirWithKey: File = null |