From 3b395e10510782474789c9098084503f98ca4830 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Dec 2014 21:16:03 -0800 Subject: [SPARK-4798][SQL] A new set of Parquet testing API and test suites This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled: - `ParquetQuerySuite` - `ParquetTestData` [Review on Reviewable](https://reviewable.io/reviews/apache/spark/3644) Author: Cheng Lian Closes #3644 from liancheng/parquet-tests and squashes the following commits: 800e745 [Cheng Lian] Enforces ordering of test output 3bb8731 [Cheng Lian] Refactors HiveParquetSuite aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext 7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc 7f07af0 [Cheng Lian] Adds a new set of Parquet test suites --- .../main/scala/org/apache/spark/sql/SQLConf.scala | 4 + .../org/apache/spark/sql/parquet/ParquetTest.scala | 127 +++++++++ .../spark/sql/parquet/ParquetFilterSuite.scala | 253 ++++++++++++++++++ .../apache/spark/sql/parquet/ParquetIOSuite.scala | 287 +++++++++++++++++++++ .../spark/sql/parquet/ParquetQuerySuite2.scala | 110 ++++++++ .../spark/sql/parquet/ParquetSchemaSuite.scala | 164 ++++++++++++ .../spark/sql/parquet/HiveParquetSuite.scala | 119 +++------ .../apache/spark/sql/parquet/parquetSuites.scala | 6 +- 8 files changed, 989 insertions(+), 81 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f5abf71d6c..f5bf935522 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -188,6 +188,10 @@ private[sql] trait SQLConf { */ def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap } + private[spark] def unsetConf(key: String) { + settings -= key + } + private[spark] def clear() { settings.clear() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala new file mode 100644 index 0000000000..b4d48902fd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.File + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag +import scala.util.Try + +import org.apache.spark.sql.{SQLContext, SchemaRDD} +import org.apache.spark.sql.catalyst.util +import org.apache.spark.util.Utils + +/** + * A helper trait that provides convenient facilities for Parquet testing. + * + * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more + * convenient to use tuples rather than special case classes when writing test cases/suites. + * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. + */ +trait ParquetTest { + val sqlContext: SQLContext + + import sqlContext._ + + protected def configuration = sparkContext.hadoopConfiguration + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL + * configurations. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(getConf(key)).toOption) + (keys, values).zipped.foreach(setConf) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => setConf(key, value) + case (key, None) => unsetConf(key) + } + } + } + + /** + * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If + * a file/directory is created there by `f`, it will be delete after `f` returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempPath(f: File => Unit): Unit = { + val file = util.getTempFilePath("parquetTest").getCanonicalFile + try f(file) finally if (file.exists()) Utils.deleteRecursively(file) + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + + /** + * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected def withParquetFile[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: String => Unit): Unit = { + withTempPath { file => + sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath) + f(file.getCanonicalPath) + } + } + + /** + * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`. + * The Parquet file will be deleted after `f` returns. + */ + protected def withParquetRDD[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: SchemaRDD => Unit): Unit = { + withParquetFile(data)(path => f(parquetFile(path))) + } + + /** + * Drops temporary table `tableName` after calling `f`. + */ + protected def withTempTable(tableName: String)(f: => Unit): Unit = { + try f finally dropTempTable(tableName) + } + + /** + * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary + * table named `tableName`, then call `f`. The temporary table together with the Parquet file will + * be dropped/deleted after `f` returns. + */ + protected def withParquetTable[T <: Product: ClassTag: TypeTag] + (data: Seq[T], tableName: String) + (f: => Unit): Unit = { + withParquetRDD(data) { rdd => + rdd.registerTempTable(tableName) + withTempTable(tableName)(f) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala new file mode 100644 index 0000000000..111a459e6d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import parquet.filter2.predicate.Operators._ +import parquet.filter2.predicate.{FilterPredicate, Operators} + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row} +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD} + +/** + * A test suite that tests Parquet filter2 API based filter pushdown optimization. + * + * Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the + * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)` + * results a `GtEq` filter predicate rather than a `Not`. + * + * @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367 + */ +class ParquetFilterSuite extends QueryTest with ParquetTest { + val sqlContext = TestSQLContext + + private def checkFilterPushdown( + rdd: SchemaRDD, + output: Seq[Symbol], + predicate: Predicate, + filterClass: Class[_ <: FilterPredicate], + checker: (SchemaRDD, Any) => Unit, + expectedResult: => Any): Unit = { + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + val query = rdd.select(output.map(_.attr): _*).where(predicate) + + val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect { + case plan: ParquetTableScan => plan.columnPruningPred + }.flatten.reduceOption(_ && _) + + assert(maybeAnalyzedPredicate.isDefined) + maybeAnalyzedPredicate.foreach { pred => + val maybeFilter = ParquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.foreach(f => assert(f.getClass === filterClass)) + } + + checker(query, expectedResult) + } + } + + private def checkFilterPushdown + (rdd: SchemaRDD, output: Symbol*) + (predicate: Predicate, filterClass: Class[_ <: FilterPredicate]) + (expectedResult: => Any): Unit = { + checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult) + } + + def checkBinaryFilterPushdown + (rdd: SchemaRDD, output: Symbol*) + (predicate: Predicate, filterClass: Class[_ <: FilterPredicate]) + (expectedResult: => Any): Unit = { + def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = { + val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq + val expected = result match { + case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(",")) + case s => Seq(s.asInstanceOf[Array[Byte]].mkString(",")) + } + assert(actual.sorted === expected.sorted) + } + checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult) + } + + test("filter pushdown - boolean") { + withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true) + checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false) + } + } + + test("filter pushdown - integer") { + withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - long") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - float") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - double") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - string") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1") + checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) { + (2 to 4).map(i => Row.apply(i.toString)) + } + + checkFilterPushdown(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4") + + checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])("4") + + checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3") + checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) { + Seq(Row("1"), Row("4")) + } + } + } + + test("filter pushdown - binary") { + implicit class IntToBinary(int: Int) { + def b: Array[Byte] = int.toString.getBytes("UTF-8") + } + + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd => + checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) { + (2 to 4).map(i => Row.apply(i.b)).toSeq + } + + checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b) + + checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b) + + checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) { + Seq(Row(1.b), Row(4.b)) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala new file mode 100644 index 0000000000..10a01474e9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import parquet.example.data.simple.SimpleGroup +import parquet.example.data.{Group, GroupWriter} +import parquet.hadoop.api.WriteSupport +import parquet.hadoop.api.WriteSupport.WriteContext +import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.{ParquetFileWriter, ParquetWriter} +import parquet.io.api.RecordConsumer +import parquet.schema.{MessageType, MessageTypeParser} + +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.types.DecimalType +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD} + +// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport +// with an empty configuration (it is after all not intended to be used in this way?) +// and members are private so we need to make our own in order to pass the schema +// to the writer. +private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { + var groupWriter: GroupWriter = null + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + groupWriter = new GroupWriter(recordConsumer, schema) + } + + override def init(configuration: Configuration): WriteContext = { + new WriteContext(schema, new java.util.HashMap[String, String]()) + } + + override def write(record: Group) { + groupWriter.write(record) + } +} + +/** + * A test suite that tests basic Parquet I/O. + */ +class ParquetIOSuite extends QueryTest with ParquetTest { + val sqlContext = TestSQLContext + + /** + * Writes `data` to a Parquet file, reads it back and check file contents. + */ + protected def checkParquetFile[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = { + withParquetRDD(data)(checkAnswer(_, data)) + } + + test("basic data types (without binary)") { + val data = (1 to 4).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + } + checkParquetFile(data) + } + + test("raw binary") { + val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) + withParquetRDD(data) { rdd => + assertResult(data.map(_._1.mkString(",")).sorted) { + rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted + } + } + } + + test("string") { + val data = (1 to 4).map(i => Tuple1(i.toString)) + // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL + // as we store Spark SQL schema in the extra metadata. + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data)) + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data)) + } + + test("fixed-length decimals") { + def makeDecimalRDD(decimal: DecimalType): SchemaRDD = + sparkContext + .parallelize(0 to 1000) + .map(i => Tuple1(i / 100.0)) + .select('_1 cast decimal) + + for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { + withTempPath { dir => + val data = makeDecimalRDD(DecimalType(precision, scale)) + data.saveAsParquetFile(dir.getCanonicalPath) + checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq) + } + } + + // Decimals with precision above 18 are not yet supported + intercept[RuntimeException] { + withTempPath { dir => + makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).collect() + } + } + + // Unlimited-length decimals are not yet supported + intercept[RuntimeException] { + withTempPath { dir => + makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).collect() + } + } + } + + test("map") { + val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) + checkParquetFile(data) + } + + test("array") { + val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1))) + checkParquetFile(data) + } + + test("struct") { + val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) + withParquetRDD(data) { rdd => + // Structs are converted to `Row`s + checkAnswer(rdd, data.map { case Tuple1(struct) => + Tuple1(Row(struct.productIterator.toSeq: _*)) + }) + } + } + + test("nested struct with array of array as field") { + val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) + withParquetRDD(data) { rdd => + // Structs are converted to `Row`s + checkAnswer(rdd, data.map { case Tuple1(struct) => + Tuple1(Row(struct.productIterator.toSeq: _*)) + }) + } + } + + test("nested map with struct as value type") { + val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) + withParquetRDD(data) { rdd => + checkAnswer(rdd, data.map { case Tuple1(m) => + Tuple1(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) + }) + } + } + + test("nulls") { + val allNulls = ( + null.asInstanceOf[java.lang.Boolean], + null.asInstanceOf[Integer], + null.asInstanceOf[java.lang.Long], + null.asInstanceOf[java.lang.Float], + null.asInstanceOf[java.lang.Double]) + + withParquetRDD(allNulls :: Nil) { rdd => + val rows = rdd.collect() + assert(rows.size === 1) + assert(rows.head === Row(Seq.fill(5)(null): _*)) + } + } + + test("nones") { + val allNones = ( + None.asInstanceOf[Option[Int]], + None.asInstanceOf[Option[Long]], + None.asInstanceOf[Option[String]]) + + withParquetRDD(allNones :: Nil) { rdd => + val rows = rdd.collect() + assert(rows.size === 1) + assert(rows.head === Row(Seq.fill(3)(null): _*)) + } + } + + test("compression codec") { + def compressionCodecFor(path: String) = { + val codecs = ParquetTypesConverter + .readMetaData(new Path(path), Some(configuration)) + .getBlocks + .flatMap(_.getColumns) + .map(_.getCodec.name()) + .distinct + + assert(codecs.size === 1) + codecs.head + } + + val data = (0 until 10).map(i => (i, i.toString)) + + def checkCompressionCodec(codec: CompressionCodecName): Unit = { + withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) { + withParquetFile(data) { path => + assertResult(parquetCompressionCodec.toUpperCase) { + compressionCodecFor(path) + } + } + } + } + + // Checks default compression codec + checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec)) + + checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) + checkCompressionCodec(CompressionCodecName.GZIP) + checkCompressionCodec(CompressionCodecName.SNAPPY) + } + + test("read raw Parquet file") { + def makeRawParquetFile(path: Path): Unit = { + val schema = MessageTypeParser.parseMessageType( + """ + |message root { + | required boolean _1; + | required int32 _2; + | required int64 _3; + | required float _4; + | required double _5; + |} + """.stripMargin) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, i % 2 == 0) + record.add(1, i) + record.add(2, i.toLong) + record.add(3, i.toFloat) + record.add(4, i.toDouble) + writer.write(record) + } + + writer.close() + } + + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + checkAnswer(parquetFile(path.toString), (0 until 10).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + }) + } + } + + test("write metadata") { + withTempPath { file => + val path = new Path(file.toURI.toString) + val fs = FileSystem.getLocal(configuration) + val attributes = ScalaReflection.attributesFor[(Int, String)] + ParquetTypesConverter.writeMetaData(attributes, path, configuration) + + assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) + assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) + + val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration)) + val actualSchema = metaData.getFileMetaData.getSchema + val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) + + actualSchema.checkContains(expectedSchema) + expectedSchema.checkContains(actualSchema) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala new file mode 100644 index 0000000000..daa7ca65cd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ + +/** + * A test suite that tests various Parquet queries. + */ +class ParquetQuerySuite2 extends QueryTest with ParquetTest { + val sqlContext = TestSQLContext + + test("simple projection") { + withParquetTable((0 until 10).map(i => (i, i.toString)), "t") { + checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_))) + } + } + + test("insertion") { + withTempDir { dir => + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + createParquetFile[(Int, String)](dir.toString).registerTempTable("dest") + withTempTable("dest") { + sql("INSERT OVERWRITE INTO dest SELECT * FROM t") + checkAnswer(table("dest"), data) + } + } + } + } + + test("appending") { + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + sql("INSERT INTO t SELECT * FROM t") + checkAnswer(table("t"), data ++ data) + } + } + + test("self-join") { + // 4 rows, cells of column 1 of row 2 and row 4 are null + val data = (1 to 4).map { i => + val maybeInt = if (i % 2 == 0) None else Some(i) + (maybeInt, i.toString) + } + + withParquetTable(data, "t") { + val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1") + val queryOutput = selfJoin.queryExecution.analyzed.output + + assertResult(4, s"Field count mismatches")(queryOutput.size) + assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") { + queryOutput.filter(_.name == "_1").map(_.exprId).size + } + + checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3"))) + } + } + + test("nested data - struct with array field") { + val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i")))) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map { + case Tuple1((_, Seq(string))) => Row(string) + }) + } + } + + test("nested data - array of struct") { + val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i"))) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map { + case Tuple1(Seq((_, string))) => Row(string) + }) + } + } + + test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { + withParquetTable((1 to 10).map(Tuple1.apply), "t") { + checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_))) + } + } + + test("SPARK-3536 regression: query empty Parquet file shouldn't throw") { + withTempDir { dir => + createParquetFile[(Int, String)](dir.toString).registerTempTable("t") + withTempTable("t") { + checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala new file mode 100644 index 0000000000..34d61bf908 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.scalatest.FunSuite +import parquet.schema.MessageTypeParser + +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.types.{BinaryType, IntegerType, StructField, StructType} +import org.apache.spark.sql.test.TestSQLContext + +class ParquetSchemaSuite extends FunSuite with ParquetTest { + val sqlContext = TestSQLContext + + /** + * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. + */ + private def testSchema[T <: Product: ClassTag: TypeTag]( + testName: String, messageType: String): Unit = { + test(testName) { + val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T]) + val expected = MessageTypeParser.parseMessageType(messageType) + actual.checkContains(expected) + expected.checkContains(actual) + } + } + + testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])]( + "basic types", + """ + |message root { + | required boolean _1; + | required int32 _2; + | required int64 _3; + | required float _4; + | required double _5; + | optional binary _6; + |} + """.stripMargin) + + testSchema[(Byte, Short, Int, Long)]( + "logical integral types", + """ + |message root { + | required int32 _1 (INT_8); + | required int32 _2 (INT_16); + | required int32 _3 (INT_32); + | required int64 _4 (INT_64); + |} + """.stripMargin) + + // Currently String is the only supported logical binary type. + testSchema[Tuple1[String]]( + "binary logical types", + """ + |message root { + | optional binary _1 (UTF8); + |} + """.stripMargin) + + testSchema[Tuple1[Seq[Int]]]( + "array", + """ + |message root { + | optional group _1 (LIST) { + | repeated int32 array; + | } + |} + """.stripMargin) + + testSchema[Tuple1[Map[Int, String]]]( + "map", + """ + |message root { + | optional group _1 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional binary value (UTF8); + | } + | } + |} + """.stripMargin) + + testSchema[Tuple1[Pair[Int, String]]]( + "struct", + """ + |message root { + | optional group _1 { + | required int32 _1; + | optional binary _2 (UTF8); + | } + |} + """.stripMargin) + + testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( + "deeply nested type", + """ + |message root { + | optional group _1 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional group value { + | optional binary _1 (UTF8); + | optional group _2 (LIST) { + | repeated group bag { + | optional group array { + | required int32 _1; + | required double _2; + | } + | } + | } + | } + | } + | } + |} + """.stripMargin) + + testSchema[(Option[Int], Map[Int, Option[Double]])]( + "optional types", + """ + |message root { + | optional int32 _1; + | optional group _2 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional double value; + | } + | } + |} + """.stripMargin) + + test("DataType string parser compatibility") { + val schema = StructType(List( + StructField("c1", IntegerType, false), + StructField("c2", BinaryType, true))) + + val fromCaseClassString = ParquetTypesConverter.convertFromString(schema.toString) + val fromJson = ParquetTypesConverter.convertFromString(schema.json) + + (fromCaseClassString, fromJson).zipped.foreach { (a, b) => + assert(a.name == b.name) + assert(a.dataType === b.dataType) + assert(a.nullable === b.nullable) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 6f57fe8958..4bc14bad0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,103 +17,66 @@ package org.apache.spark.sql.parquet -import java.io.File - -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} - -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} -import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType} -import org.apache.spark.sql.{parquet, SchemaRDD} -import org.apache.spark.util.Utils - -// Implicits -import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.hive.test.TestHive case class Cases(lower: String, UPPER: String) -class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { - - val dirname = Utils.createTempDir() - - var testRDD: SchemaRDD = null - - override def beforeAll() { - // write test data - ParquetTestData.writeFile() - testRDD = parquetFile(ParquetTestData.testDir.toString) - testRDD.registerTempTable("testsource") - } - - override def afterAll() { - Utils.deleteRecursively(ParquetTestData.testDir) - Utils.deleteRecursively(dirname) - reset() // drop all tables that were registered as part of the tests - } - - // in case tests are failing we delete before and after each test - override def beforeEach() { - Utils.deleteRecursively(dirname) - } +class HiveParquetSuite extends QueryTest with ParquetTest { + val sqlContext = TestHive - override def afterEach() { - Utils.deleteRecursively(dirname) - } + import sqlContext._ test("Case insensitive attribute names") { - val tempFile = File.createTempFile("parquet", "") - tempFile.delete() - sparkContext.parallelize(1 to 10) - .map(_.toString) - .map(i => Cases(i, i)) - .saveAsParquetFile(tempFile.getCanonicalPath) - - parquetFile(tempFile.getCanonicalPath).registerTempTable("cases") - sql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) - sql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { + val expected = (1 to 4).map(i => Row(i.toString)) + checkAnswer(sql("SELECT upper FROM cases"), expected) + checkAnswer(sql("SELECT LOWER FROM cases"), expected) + } } test("SELECT on Parquet table") { - val rdd = sql("SELECT * FROM testsource").collect() - assert(rdd != null) - assert(rdd.forall(_.size == 6)) + val data = (1 to 4).map(i => (i, s"val_$i")) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT * FROM t"), data) + } } test("Simple column projection + filter on Parquet table") { - val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() - assert(rdd.size === 5, "Filter returned incorrect number of rows") - assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value") + withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") { + checkAnswer( + sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"), + Seq(Row(true, "val_2"), Row(true, "val_4"))) + } } test("Converting Hive to Parquet Table via saveAsParquetFile") { - sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) - parquetFile(dirname.getAbsolutePath).registerTempTable("ptable") - val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0)) - val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0)) - - compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String")) + withTempPath { dir => + sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + checkAnswer( + sql("SELECT * FROM src ORDER BY key"), + sql("SELECT * from p ORDER BY key").collect().toSeq) + } + } } - test("INSERT OVERWRITE TABLE Parquet table") { - sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) - parquetFile(dirname.getAbsolutePath).registerTempTable("ptable") - // let's do three overwrites for good measure - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - val rddCopy = sql("SELECT * FROM ptable").collect() - val rddOrig = sql("SELECT * FROM testsource").collect() - assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??") - compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) - } - private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { - var counter = 0 - (rddOne, rddTwo).zipped.foreach { - (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach { - case ((value_1, value_2), index) => - assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match") + test("INSERT OVERWRITE TABLE Parquet table") { + withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") { + withTempPath { file => + sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath) + parquetFile(file.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + // let's do three overwrites for good measure + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq) + } } - counter = counter + 1 } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 488ebba043..fc0e42c201 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) * A suite to test the automatic conversion of metastore tables with parquet data to use the * built in parquet support. */ -class ParquetMetastoreSuite extends ParquetTest { +class ParquetMetastoreSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest { /** * A suite of tests for the Parquet support through the data sources API. */ -class ParquetSourceSuite extends ParquetTest { +class ParquetSourceSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest { /** * A collection of tests for parquet data with various forms of partitioning. */ -abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { +abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null var partitionedTableDirWithKey: File = null -- cgit v1.2.3