aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-12-16 21:16:03 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-16 21:16:03 -0800
commit3b395e10510782474789c9098084503f98ca4830 (patch)
tree6a3193af08fb8c82f8f8f6cab6fc2a5ed5c0fbc3 /sql/hive
parentb85044ecfa825ff68c8e57eeffa4d9f214335e66 (diff)
downloadspark-3b395e10510782474789c9098084503f98ca4830.tar.gz
spark-3b395e10510782474789c9098084503f98ca4830.tar.bz2
spark-3b395e10510782474789c9098084503f98ca4830.zip
[SPARK-4798][SQL] A new set of Parquet testing API and test suites
This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled: - `ParquetQuerySuite` - `ParquetTestData` <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3644) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3644 from liancheng/parquet-tests and squashes the following commits: 800e745 [Cheng Lian] Enforces ordering of test output 3bb8731 [Cheng Lian] Refactors HiveParquetSuite aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext 7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc 7f07af0 [Cheng Lian] Adds a new set of Parquet test suites
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala119
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala6
2 files changed, 44 insertions, 81 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 6f57fe8958..4bc14bad0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,103 +17,66 @@
package org.apache.spark.sql.parquet
-import java.io.File
-
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
-import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
-import org.apache.spark.sql.{parquet, SchemaRDD}
-import org.apache.spark.util.Utils
-
-// Implicits
-import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.hive.test.TestHive
case class Cases(lower: String, UPPER: String)
-class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
-
- val dirname = Utils.createTempDir()
-
- var testRDD: SchemaRDD = null
-
- override def beforeAll() {
- // write test data
- ParquetTestData.writeFile()
- testRDD = parquetFile(ParquetTestData.testDir.toString)
- testRDD.registerTempTable("testsource")
- }
-
- override def afterAll() {
- Utils.deleteRecursively(ParquetTestData.testDir)
- Utils.deleteRecursively(dirname)
- reset() // drop all tables that were registered as part of the tests
- }
-
- // in case tests are failing we delete before and after each test
- override def beforeEach() {
- Utils.deleteRecursively(dirname)
- }
+class HiveParquetSuite extends QueryTest with ParquetTest {
+ val sqlContext = TestHive
- override def afterEach() {
- Utils.deleteRecursively(dirname)
- }
+ import sqlContext._
test("Case insensitive attribute names") {
- val tempFile = File.createTempFile("parquet", "")
- tempFile.delete()
- sparkContext.parallelize(1 to 10)
- .map(_.toString)
- .map(i => Cases(i, i))
- .saveAsParquetFile(tempFile.getCanonicalPath)
-
- parquetFile(tempFile.getCanonicalPath).registerTempTable("cases")
- sql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
- sql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
+ val expected = (1 to 4).map(i => Row(i.toString))
+ checkAnswer(sql("SELECT upper FROM cases"), expected)
+ checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+ }
}
test("SELECT on Parquet table") {
- val rdd = sql("SELECT * FROM testsource").collect()
- assert(rdd != null)
- assert(rdd.forall(_.size == 6))
+ val data = (1 to 4).map(i => (i, s"val_$i"))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT * FROM t"), data)
+ }
}
test("Simple column projection + filter on Parquet table") {
- val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
- assert(rdd.size === 5, "Filter returned incorrect number of rows")
- assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
+ withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+ checkAnswer(
+ sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+ Seq(Row(true, "val_2"), Row(true, "val_4")))
+ }
}
test("Converting Hive to Parquet Table via saveAsParquetFile") {
- sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
- parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
- val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
- val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
-
- compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
+ withTempPath { dir =>
+ sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ checkAnswer(
+ sql("SELECT * FROM src ORDER BY key"),
+ sql("SELECT * from p ORDER BY key").collect().toSeq)
+ }
+ }
}
- test("INSERT OVERWRITE TABLE Parquet table") {
- sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
- parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
- // let's do three overwrites for good measure
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
- val rddCopy = sql("SELECT * FROM ptable").collect()
- val rddOrig = sql("SELECT * FROM testsource").collect()
- assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
- compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
- }
- private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
- var counter = 0
- (rddOne, rddTwo).zipped.foreach {
- (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
- case ((value_1, value_2), index) =>
- assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
+ test("INSERT OVERWRITE TABLE Parquet table") {
+ withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") {
+ withTempPath { file =>
+ sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath)
+ parquetFile(file.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
+ }
}
- counter = counter + 1
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 488ebba043..fc0e42c201 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
-class ParquetMetastoreSuite extends ParquetTest {
+class ParquetMetastoreSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest {
/**
* A suite of tests for the Parquet support through the data sources API.
*/
-class ParquetSourceSuite extends ParquetTest {
+class ParquetSourceSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest {
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
-abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll {
var partitionedTableDir: File = null
var partitionedTableDirWithKey: File = null