aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-01-21 14:38:10 -0800
committerReynold Xin <rxin@databricks.com>2015-01-21 14:38:10 -0800
commitba19689fe77b90052b587640c9ff325c5a892c20 (patch)
tree19c81c79c506e6b8c05fa60631ad865c6ccb87f3
parentb328ac6c8c489ef9abf850c45db5ad531da18d55 (diff)
downloadspark-ba19689fe77b90052b587640c9ff325c5a892c20.tar.gz
spark-ba19689fe77b90052b587640c9ff325c5a892c20.tar.bz2
spark-ba19689fe77b90052b587640c9ff325c5a892c20.zip
[SQL] [Minor] Remove deprecated parquet tests
This PR removes the deprecated `ParquetQuerySuite`, renamed `ParquetQuerySuite2` to `ParquetQuerySuite`, and refactored changes introduced in #4115 to `ParquetFilterSuite` . It is a follow-up of #3644. Notice that test cases in the old `ParquetQuerySuite` have already been well covered by other test suites introduced in #3644. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4116) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4116 from liancheng/remove-deprecated-parquet-tests and squashes the following commits: f73b8f9 [Cheng Lian] Removes deprecated Parquet test suite
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala373
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala1040
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala88
3 files changed, 212 insertions, 1289 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 4ad8c47200..1e7d3e06fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -21,7 +21,7 @@ import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, Predicate, Row}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
@@ -40,15 +40,16 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
class ParquetFilterSuite extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
- private def checkFilterPushdown(
+ private def checkFilterPredicate(
rdd: SchemaRDD,
- output: Seq[Symbol],
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
- checker: (SchemaRDD, Any) => Unit,
- expectedResult: Any): Unit = {
+ checker: (SchemaRDD, Seq[Row]) => Unit,
+ expected: Seq[Row]): Unit = {
+ val output = predicate.collect { case a: Attribute => a }.distinct
+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
- val query = rdd.select(output.map(_.attr): _*).where(predicate)
+ val query = rdd.select(output: _*).where(predicate)
val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
case plan: ParquetTableScan => plan.columnPruningPred
@@ -58,209 +59,180 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
maybeAnalyzedPredicate.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
- maybeFilter.foreach(f => assert(f.getClass === filterClass))
+ maybeFilter.foreach { f =>
+ // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+ assert(f.getClass === filterClass)
+ }
}
- checker(query, expectedResult)
+ checker(query, expected)
}
}
- private def checkFilterPushdown1
- (rdd: SchemaRDD, output: Symbol*)
- (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
- (expectedResult: => Seq[Row]): Unit = {
- checkFilterPushdown(rdd, output, predicate, filterClass,
- (query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), expectedResult)
- }
-
- private def checkFilterPushdown
- (rdd: SchemaRDD, output: Symbol*)
- (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
- (expectedResult: Int): Unit = {
- checkFilterPushdown(rdd, output, predicate, filterClass,
- (query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), Seq(Row(expectedResult)))
+ private def checkFilterPredicate
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
+ (implicit rdd: SchemaRDD): Unit = {
+ checkFilterPredicate(rdd, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected)
}
- def checkBinaryFilterPushdown
- (rdd: SchemaRDD, output: Symbol*)
- (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
- (expectedResult: => Any): Unit = {
- def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
- val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
- val expected = result match {
- case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
- case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
- }
- assert(actual.sorted === expected.sorted)
- }
- checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
+ private def checkFilterPredicate[T]
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T)
+ (implicit rdd: SchemaRDD): Unit = {
+ checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
}
test("filter pushdown - boolean") {
- withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Boolean]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Boolean]]) {
- Seq(Row(true), Row(false))
- }
+ withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
- checkFilterPushdown1(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(Seq(Row(true)))
- checkFilterPushdown1(rdd, '_1)('_1 !== true, classOf[Operators.NotEq[java.lang.Boolean]])(Seq(Row(false)))
+ checkFilterPredicate('_1 === true, classOf[Eq [_]], true)
+ checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
}
}
test("filter pushdown - integer") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[Integer]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[Integer]]) {
- (1 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
- checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[Integer]]) {
- (2 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1)
- checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4)
- checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
- checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
-
- checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4)
- checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4)
-
- checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
- checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
- checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
- Seq(Row(1), Row(4))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - long") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Long]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Long]]) {
- (1 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
- checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Long]]) {
- (2 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1)
- checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4)
- checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
- checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
-
- checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4)
- checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4)
-
- checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
- checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
- checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
- Seq(Row(1), Row(4))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - float") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Float]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Float]]) {
- (1 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
- checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Float]]) {
- (2 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1)
- checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4)
- checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
- checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
-
- checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4)
- checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4)
-
- checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
- checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
- checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
- Seq(Row(1), Row(4))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - double") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Double]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Double]]) {
- (1 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
- checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Double]]) {
- (2 to 4).map(Row.apply(_))
- }
-
- checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1)
- checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4)
- checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
- checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
-
- checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4)
- checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1)
- checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4)
-
- checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
- checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
- checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
- Seq(Row(1), Row(4))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - string") {
- withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { rdd =>
- checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
- checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
- (1 to 4).map(i => Row.apply(i.toString))
- }
-
- checkFilterPushdown1(rdd, '_1)('_1 === "1", classOf[Eq[String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)('_1 !== "1", classOf[Operators.NotEq[String]]) {
- (2 to 4).map(i => Row.apply(i.toString))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(
+ '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
+
+ checkFilterPredicate('_1 === "1", classOf[Eq [_]], "1")
+ checkFilterPredicate('_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
+
+ checkFilterPredicate('_1 < "2", classOf[Lt [_]], "1")
+ checkFilterPredicate('_1 > "3", classOf[Gt [_]], "4")
+ checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
+ checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
+
+ checkFilterPredicate(Literal("1") === '_1, classOf[Eq [_]], "1")
+ checkFilterPredicate(Literal("2") > '_1, classOf[Lt [_]], "1")
+ checkFilterPredicate(Literal("3") < '_1, classOf[Gt [_]], "4")
+ checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
+ checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
+
+ checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
+ checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
+ checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
+ }
+ }
- checkFilterPushdown1(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])(Seq(Row("4")))
- checkFilterPushdown1(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])(Seq(Row("4")))
-
- checkFilterPushdown1(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])(Seq(Row("4")))
- checkFilterPushdown1(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])(Seq(Row("1")))
- checkFilterPushdown1(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])(Seq(Row("4")))
-
- checkFilterPushdown1(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])(Seq(Row("4")))
- checkFilterPushdown1(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])(Seq(Row("3")))
- checkFilterPushdown1(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
- Seq(Row("1"), Row("4"))
+ def checkBinaryFilterPredicate
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
+ (implicit rdd: SchemaRDD): Unit = {
+ def checkBinaryAnswer(rdd: SchemaRDD, expected: Seq[Row]) = {
+ assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
+ rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
}
}
+
+ checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, expected)
+ }
+
+ def checkBinaryFilterPredicate
+ (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte])
+ (implicit rdd: SchemaRDD): Unit = {
+ checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
}
test("filter pushdown - binary") {
@@ -268,33 +240,30 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}
- withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { rdd =>
- checkBinaryFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
- checkBinaryFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
- (1 to 4).map(i => Row.apply(i.b)).toSeq
- }
-
- checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.NotEq[Array[Byte]]]) {
- (2 to 4).map(i => Row.apply(i.b)).toSeq
- }
-
- checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
-
- checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b)
- checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b)
- checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b)
-
- checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
- checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
- Seq(Row(1.b), Row(4.b))
- }
+ withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
+ checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkBinaryFilterPredicate(
+ '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq)
+
+ checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq [_]], 1.b)
+ checkBinaryFilterPredicate(
+ '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq)
+
+ checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt [_]], 1.b)
+ checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt [_]], 4.b)
+ checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
+ checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
+
+ checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq [_]], 1.b)
+ checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt [_]], 1.b)
+ checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt [_]], 4.b)
+ checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
+ checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
+
+ checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
+ checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
+ checkBinaryFilterPredicate(
+ '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 2c5345b1f9..1263ff818e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,1030 +17,72 @@
package org.apache.spark.sql.parquet
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
-import parquet.filter2.predicate.{FilterPredicate, Operators}
-import parquet.hadoop.ParquetFileWriter
-import parquet.hadoop.util.ContextUtil
-import parquet.io.api.Binary
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Row => _, _}
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-case class TestRDDEntry(key: Int, value: String)
-
-case class NullReflectData(
- intField: java.lang.Integer,
- longField: java.lang.Long,
- floatField: java.lang.Float,
- doubleField: java.lang.Double,
- booleanField: java.lang.Boolean)
-
-case class OptionalReflectData(
- intField: Option[Int],
- longField: Option[Long],
- floatField: Option[Float],
- doubleField: Option[Double],
- booleanField: Option[Boolean])
-
-case class Nested(i: Int, s: String)
-
-case class Data(array: Seq[Int], nested: Nested)
-
-case class AllDataTypes(
- stringField: String,
- intField: Int,
- longField: Long,
- floatField: Float,
- doubleField: Double,
- shortField: Short,
- byteField: Byte,
- booleanField: Boolean)
-
-case class AllDataTypesWithNonPrimitiveType(
- stringField: String,
- intField: Int,
- longField: Long,
- floatField: Float,
- doubleField: Double,
- shortField: Short,
- byteField: Byte,
- booleanField: Boolean,
- array: Seq[Int],
- arrayContainsNull: Seq[Option[Int]],
- map: Map[Int, Long],
- mapValueContainsNull: Map[Int, Option[Long]],
- data: Data)
-
-case class BinaryData(binaryData: Array[Byte])
-
-case class NumericData(i: Int, d: Double)
-class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
- TestData // Load test data tables.
-
- private var testRDD: SchemaRDD = null
- private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown
-
- override def beforeAll() {
- ParquetTestData.writeFile()
- ParquetTestData.writeFilterFile()
- ParquetTestData.writeNestedFile1()
- ParquetTestData.writeNestedFile2()
- ParquetTestData.writeNestedFile3()
- ParquetTestData.writeNestedFile4()
- ParquetTestData.writeGlobFiles()
- testRDD = parquetFile(ParquetTestData.testDir.toString)
- testRDD.registerTempTable("testsource")
- parquetFile(ParquetTestData.testFilterDir.toString)
- .registerTempTable("testfiltersource")
-
- setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
- }
-
- override def afterAll() {
- Utils.deleteRecursively(ParquetTestData.testDir)
- Utils.deleteRecursively(ParquetTestData.testFilterDir)
- Utils.deleteRecursively(ParquetTestData.testNestedDir1)
- Utils.deleteRecursively(ParquetTestData.testNestedDir2)
- Utils.deleteRecursively(ParquetTestData.testNestedDir3)
- Utils.deleteRecursively(ParquetTestData.testNestedDir4)
- Utils.deleteRecursively(ParquetTestData.testGlobDir)
- // here we should also unregister the table??
-
- setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, originalParquetFilterPushdownEnabled.toString)
- }
+/**
+ * A test suite that tests various Parquet queries.
+ */
+class ParquetQuerySuite extends QueryTest with ParquetTest {
+ val sqlContext = TestSQLContext
- test("Read/Write All Types") {
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- val range = (0 to 255)
- val data = sparkContext.parallelize(range).map { x =>
- parquet.AllDataTypes(
- s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)
+ test("simple projection") {
+ withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+ checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
}
-
- data.saveAsParquetFile(tempDir)
-
- checkAnswer(
- parquetFile(tempDir),
- data.toSchemaRDD.collect().toSeq)
}
- test("read/write binary data") {
- // Since equality for Array[Byte] is broken we test this separately.
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
- parquetFile(tempDir)
- .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
- .collect().toSeq == Seq("test")
- }
-
- ignore("Treat binary as string") {
- val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString
-
- // Create the test file.
- val file = getTempFilePath("parquet")
- val path = file.toString
- val range = (0 to 255)
- val rowRDD = TestSQLContext.sparkContext.parallelize(range)
- .map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
- // We need to ask Parquet to store the String column as a Binary column.
- val schema = StructType(
- StructField("c1", IntegerType, false) ::
- StructField("c2", BinaryType, false) :: Nil)
- val schemaRDD1 = applySchema(rowRDD, schema)
- schemaRDD1.saveAsParquetFile(path)
- checkAnswer(
- parquetFile(path).select('c1, 'c2.cast(StringType)),
- schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
-
- setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
- parquetFile(path).printSchema()
- checkAnswer(
- parquetFile(path),
- schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
-
-
- // Set it back.
- TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
- }
-
- test("Compression options for writing to a Parquetfile") {
- val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec
- import scala.collection.JavaConversions._
-
- val file = getTempFilePath("parquet")
- val path = file.toString
- val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
- .map(i => TestRDDEntry(i, s"val_$i"))
-
- // test default compression codec
- rdd.saveAsParquetFile(path)
- var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
- .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
- parquetFile(path).registerTempTable("tmp")
- checkAnswer(
- sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
- Row(5, "val_5") ::
- Row(7, "val_7") :: Nil)
-
- Utils.deleteRecursively(file)
-
- // test uncompressed parquet file with property value "UNCOMPRESSED"
- TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
-
- rdd.saveAsParquetFile(path)
- actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
- .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
- parquetFile(path).registerTempTable("tmp")
- checkAnswer(
- sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
- Row(5, "val_5") ::
- Row(7, "val_7") :: Nil)
-
- Utils.deleteRecursively(file)
-
- // test uncompressed parquet file with property value "none"
- TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
-
- rdd.saveAsParquetFile(path)
- actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
- .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === "UNCOMPRESSED" :: Nil)
-
- parquetFile(path).registerTempTable("tmp")
- checkAnswer(
- sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
- Row(5, "val_5") ::
- Row(7, "val_7") :: Nil)
-
- Utils.deleteRecursively(file)
-
- // test gzip compression codec
- TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
-
- rdd.saveAsParquetFile(path)
- actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
- .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
- parquetFile(path).registerTempTable("tmp")
- checkAnswer(
- sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
- Row(5, "val_5") ::
- Row(7, "val_7") :: Nil)
-
- Utils.deleteRecursively(file)
-
- // test snappy compression codec
- TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
-
- rdd.saveAsParquetFile(path)
- actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
- .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
- parquetFile(path).registerTempTable("tmp")
- checkAnswer(
- sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
- Row(5, "val_5") ::
- Row(7, "val_7") :: Nil)
-
- Utils.deleteRecursively(file)
-
- // TODO: Lzo requires additional external setup steps so leave it out for now
- // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
-
- // Set it back.
- TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
- }
-
- test("Read/Write All Types with non-primitive type") {
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- val range = (0 to 255)
- val data = sparkContext.parallelize(range).map { x =>
- parquet.AllDataTypesWithNonPrimitiveType(
- s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
- (0 until x),
- (0 until x).map(Option(_).filter(_ % 3 == 0)),
- (0 until x).map(i => i -> i.toLong).toMap,
- (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
- parquet.Data((0 until x), parquet.Nested(x, s"$x")))
+ test("appending") {
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ sql("INSERT INTO t SELECT * FROM t")
+ checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- data.saveAsParquetFile(tempDir)
-
- checkAnswer(
- parquetFile(tempDir),
- data.toSchemaRDD.collect().toSeq)
}
- test("self-join parquet files") {
- val x = ParquetTestData.testData.as('x)
- val y = ParquetTestData.testData.as('y)
- val query = x.join(y).where("x.myint".attr === "y.myint".attr)
-
- // Check to make sure that the attributes from either side of the join have unique expression
- // ids.
- query.queryExecution.analyzed.output.filter(_.name == "myint") match {
- case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
- fail(s"Duplicate expression IDs found in query plan: $query")
- case Seq(_, _) => // All good
+ test("self-join") {
+ // 4 rows, cells of column 1 of row 2 and row 4 are null
+ val data = (1 to 4).map { i =>
+ val maybeInt = if (i % 2 == 0) None else Some(i)
+ (maybeInt, i.toString)
}
- val result = query.collect()
- assert(result.size === 9, "self-join result has incorrect size")
- assert(result(0).size === 12, "result row has incorrect size")
- result.zipWithIndex.foreach {
- case (row, index) => row.toSeq.zipWithIndex.foreach {
- case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column")
- }
- }
- }
+ withParquetTable(data, "t") {
+ val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+ val queryOutput = selfJoin.queryExecution.analyzed.output
- test("Import of simple Parquet file") {
- val result = parquetFile(ParquetTestData.testDir.toString).collect()
- assert(result.size === 15)
- result.zipWithIndex.foreach {
- case (row, index) => {
- val checkBoolean =
- if (index % 3 == 0)
- row(0) == true
- else
- row(0) == false
- assert(checkBoolean === true, s"boolean field value in line $index did not match")
- if (index % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
- assert(row(2) === "abc", s"string field value in line $index did not match")
- assert(row(3) === (index.toLong << 33), s"long value in line $index did not match")
- assert(row(4) === 2.5F, s"float field value in line $index did not match")
- assert(row(5) === 4.5D, s"double field value in line $index did not match")
+ assertResult(4, s"Field count mismatches")(queryOutput.size)
+ assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
+ queryOutput.filter(_.name == "_1").map(_.exprId).size
}
- }
- }
- test("Projection of simple Parquet file") {
- val result = ParquetTestData.testData.select('myboolean, 'mylong).collect()
- result.zipWithIndex.foreach {
- case (row, index) => {
- if (index % 3 == 0)
- assert(row(0) === true, s"boolean field value in line $index did not match (every third row)")
- else
- assert(row(0) === false, s"boolean field value in line $index did not match")
- assert(row(1) === (index.toLong << 33), s"long field value in line $index did not match")
- assert(row.size === 2, s"number of columns in projection in line $index is incorrect")
- }
+ checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
}
}
- test("Writing metadata from scratch for table CREATE") {
- val job = new Job()
- val path = new Path(getTempFilePath("testtable").getCanonicalFile.toURI.toString)
- val fs: FileSystem = FileSystem.getLocal(ContextUtil.getConfiguration(job))
- ParquetTypesConverter.writeMetaData(
- ParquetTestData.testData.output,
- path,
- TestSQLContext.sparkContext.hadoopConfiguration)
- assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
- assert(metaData != null)
- ParquetTestData
- .testData
- .parquetSchema
- .checkContains(metaData.getFileMetaData.getSchema) // throws exception if incompatible
- metaData
- .getFileMetaData
- .getSchema
- .checkContains(ParquetTestData.testData.parquetSchema) // throws exception if incompatible
- fs.delete(path, true)
- }
-
- test("Creating case class RDD table") {
- TestSQLContext.sparkContext.parallelize((1 to 100))
- .map(i => TestRDDEntry(i, s"val_$i"))
- .registerTempTable("tmp")
- val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
- var counter = 1
- rdd.foreach {
- // '===' does not like string comparison?
- row: Row => {
- assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
- counter = counter + 1
- }
+ test("nested data - struct with array field") {
+ val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+ case Tuple1((_, Seq(string))) => Row(string)
+ })
}
}
- test("Read a parquet file instead of a directory") {
- val file = getTempFilePath("parquet")
- val path = file.toString
- val fsPath = new Path(path)
- val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
- val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
- .map(i => TestRDDEntry(i, s"val_$i"))
- rdd.coalesce(1).saveAsParquetFile(path)
-
- val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
- assert(children.length > 0)
- val readFile = parquetFile(path + "/" + children(0).getPath.getName)
- readFile.registerTempTable("tmpx")
- val rdd_copy = sql("SELECT * FROM tmpx").collect()
- val rdd_orig = rdd.collect()
- for(i <- 0 to 99) {
- assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
- assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
+ test("nested data - array of struct") {
+ val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+ case Tuple1(Seq((_, string))) => Row(string)
+ })
}
- Utils.deleteRecursively(file)
- }
-
- test("Insert (appending) to same table via Scala API") {
- sql("INSERT INTO testsource SELECT * FROM testsource")
- val double_rdd = sql("SELECT * FROM testsource").collect()
- assert(double_rdd != null)
- assert(double_rdd.size === 30)
-
- // let's restore the original test data
- Utils.deleteRecursively(ParquetTestData.testDir)
- ParquetTestData.writeFile()
- }
-
- test("save and load case class RDD with nulls as parquet") {
- val data = parquet.NullReflectData(null, null, null, null, null)
- val rdd = sparkContext.parallelize(data :: Nil)
-
- val file = getTempFilePath("parquet")
- val path = file.toString
- rdd.saveAsParquetFile(path)
- val readFile = parquetFile(path)
-
- val rdd_saved = readFile.collect()
- assert(rdd_saved(0) === Row(null, null, null, null, null))
- Utils.deleteRecursively(file)
- assert(true)
- }
-
- test("save and load case class RDD with Nones as parquet") {
- val data = parquet.OptionalReflectData(None, None, None, None, None)
- val rdd = sparkContext.parallelize(data :: Nil)
-
- val file = getTempFilePath("parquet")
- val path = file.toString
- rdd.saveAsParquetFile(path)
- val readFile = parquetFile(path)
-
- val rdd_saved = readFile.collect()
- assert(rdd_saved(0) === Row(null, null, null, null, null))
- Utils.deleteRecursively(file)
- assert(true)
- }
-
- test("make RecordFilter for simple predicates") {
- def checkFilter[T <: FilterPredicate : ClassTag](
- predicate: Expression,
- defined: Boolean = true): Unit = {
- val filter = ParquetFilters.createFilter(predicate)
- if (defined) {
- assert(filter.isDefined)
- val tClass = implicitly[ClassTag[T]].runtimeClass
- val filterGet = filter.get
- assert(
- tClass.isInstance(filterGet),
- s"$filterGet of type ${filterGet.getClass} is not an instance of $tClass")
- } else {
- assert(filter.isEmpty)
- }
- }
-
- checkFilter[Operators.Eq[Integer]]('a.int === 1)
- checkFilter[Operators.Eq[Integer]](Literal(1) === 'a.int)
-
- checkFilter[Operators.Lt[Integer]]('a.int < 4)
- checkFilter[Operators.Lt[Integer]](Literal(4) > 'a.int)
- checkFilter[Operators.LtEq[Integer]]('a.int <= 4)
- checkFilter[Operators.LtEq[Integer]](Literal(4) >= 'a.int)
-
- checkFilter[Operators.Gt[Integer]]('a.int > 4)
- checkFilter[Operators.Gt[Integer]](Literal(4) < 'a.int)
- checkFilter[Operators.GtEq[Integer]]('a.int >= 4)
- checkFilter[Operators.GtEq[Integer]](Literal(4) <= 'a.int)
-
- checkFilter[Operators.And]('a.int === 1 && 'a.int < 4)
- checkFilter[Operators.Or]('a.int === 1 || 'a.int < 4)
- checkFilter[Operators.NotEq[Integer]](!('a.int === 1))
-
- checkFilter('a.int > 'b.int, defined = false)
- checkFilter(('a.int > 'b.int) && ('a.int > 'b.int), defined = false)
- }
-
- test("test filter by predicate pushdown") {
- for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
- val query1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100")
- assert(
- query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result1 = query1.collect()
- assert(result1.size === 50)
- assert(result1(0)(1) === 100)
- assert(result1(49)(1) === 149)
- val query2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200")
- assert(
- query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result2 = query2.collect()
- assert(result2.size === 50)
- if (myval == "myint" || myval == "mylong") {
- assert(result2(0)(1) === 151)
- assert(result2(49)(1) === 200)
- } else {
- assert(result2(0)(1) === 150)
- assert(result2(49)(1) === 199)
- }
- }
- for(myval <- Seq("myint", "mylong")) {
- val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
- assert(
- query3.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result3 = query3.collect()
- assert(result3.size === 20)
- assert(result3(0)(1) === 0)
- assert(result3(9)(1) === 9)
- assert(result3(10)(1) === 191)
- assert(result3(19)(1) === 200)
- }
- for(myval <- Seq("mydouble", "myfloat")) {
- val result4 =
- if (myval == "mydouble") {
- val query4 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10.0")
- assert(
- query4.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- query4.collect()
- } else {
- // CASTs are problematic. Here myfloat will be casted to a double and it seems there is
- // currently no way to specify float constants in SqlParser?
- sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect()
- }
- assert(result4.size === 20)
- assert(result4(0)(1) === 0)
- assert(result4(9)(1) === 9)
- assert(result4(10)(1) === 191)
- assert(result4(19)(1) === 200)
- }
- val query5 = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40")
- assert(
- query5.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val booleanResult = query5.collect()
- assert(booleanResult.size === 10)
- for(i <- 0 until 10) {
- if (!booleanResult(i).getBoolean(0)) {
- fail(s"Boolean value in result row $i not true")
- }
- if (booleanResult(i).getInt(1) != i * 4) {
- fail(s"Int value in result row $i should be ${4*i}")
- }
- }
- val query6 = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"")
- assert(
- query6.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val stringResult = query6.collect()
- assert(stringResult.size === 1)
- assert(stringResult(0).getString(2) == "100", "stringvalue incorrect")
- assert(stringResult(0).getInt(1) === 100)
-
- val query7 = sql(s"SELECT * FROM testfiltersource WHERE myoptint < 40")
- assert(
- query7.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val optResult = query7.collect()
- assert(optResult.size === 20)
- for(i <- 0 until 20) {
- if (optResult(i)(7) != i * 2) {
- fail(s"optional Int value in result row $i should be ${2*4*i}")
- }
- }
- for(myval <- Seq("myoptint", "myoptlong", "myoptdouble", "myoptfloat")) {
- val query8 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100")
- assert(
- query8.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result8 = query8.collect()
- assert(result8.size === 25)
- assert(result8(0)(7) === 100)
- assert(result8(24)(7) === 148)
- val query9 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200")
- assert(
- query9.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result9 = query9.collect()
- assert(result9.size === 25)
- if (myval == "myoptint" || myval == "myoptlong") {
- assert(result9(0)(7) === 152)
- assert(result9(24)(7) === 200)
- } else {
- assert(result9(0)(7) === 150)
- assert(result9(24)(7) === 198)
- }
- }
- val query10 = sql("SELECT * FROM testfiltersource WHERE myoptstring = \"100\"")
- assert(
- query10.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result10 = query10.collect()
- assert(result10.size === 1)
- assert(result10(0).getString(8) == "100", "stringvalue incorrect")
- assert(result10(0).getInt(7) === 100)
- val query11 = sql(s"SELECT * FROM testfiltersource WHERE myoptboolean = true AND myoptint < 40")
- assert(
- query11.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result11 = query11.collect()
- assert(result11.size === 7)
- for(i <- 0 until 6) {
- if (!result11(i).getBoolean(6)) {
- fail(s"optional Boolean value in result row $i not true")
- }
- if (result11(i).getInt(7) != i * 6) {
- fail(s"optional Int value in result row $i should be ${6*i}")
- }
- }
-
- val query12 = sql("SELECT * FROM testfiltersource WHERE mystring >= \"50\"")
- assert(
- query12.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result12 = query12.collect()
- assert(result12.size === 54)
- assert(result12(0).getString(2) == "6")
- assert(result12(4).getString(2) == "50")
- assert(result12(53).getString(2) == "99")
-
- val query13 = sql("SELECT * FROM testfiltersource WHERE mystring > \"50\"")
- assert(
- query13.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result13 = query13.collect()
- assert(result13.size === 53)
- assert(result13(0).getString(2) == "6")
- assert(result13(4).getString(2) == "51")
- assert(result13(52).getString(2) == "99")
-
- val query14 = sql("SELECT * FROM testfiltersource WHERE mystring <= \"50\"")
- assert(
- query14.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result14 = query14.collect()
- assert(result14.size === 148)
- assert(result14(0).getString(2) == "0")
- assert(result14(46).getString(2) == "50")
- assert(result14(147).getString(2) == "200")
-
- val query15 = sql("SELECT * FROM testfiltersource WHERE mystring < \"50\"")
- assert(
- query15.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
- "Top operator should be ParquetTableScan after pushdown")
- val result15 = query15.collect()
- assert(result15.size === 147)
- assert(result15(0).getString(2) == "0")
- assert(result15(46).getString(2) == "100")
- assert(result15(146).getString(2) == "200")
}
test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
- val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
- assert(query.collect().size === 10)
- }
-
- test("Importing nested Parquet file (Addressbook)") {
- val result = TestSQLContext
- .parquetFile(ParquetTestData.testNestedDir1.toString)
- .toSchemaRDD
- .collect()
- assert(result != null)
- assert(result.size === 2)
- val first_record = result(0)
- val second_record = result(1)
- assert(first_record != null)
- assert(second_record != null)
- assert(first_record.size === 3)
- assert(second_record(1) === null)
- assert(second_record(2) === null)
- assert(second_record(0) === "A. Nonymous")
- assert(first_record(0) === "Julien Le Dem")
- val first_owner_numbers = first_record(1)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- val first_contacts = first_record(2)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- assert(first_owner_numbers != null)
- assert(first_owner_numbers(0) === "555 123 4567")
- assert(first_owner_numbers(2) === "XXX XXX XXXX")
- assert(first_contacts(0)
- .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2)
- val first_contacts_entry_one = first_contacts(0)
- .asInstanceOf[CatalystConverter.StructScalaType[_]]
- assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy")
- assert(first_contacts_entry_one(1) === "555 987 6543")
- val first_contacts_entry_two = first_contacts(1)
- .asInstanceOf[CatalystConverter.StructScalaType[_]]
- assert(first_contacts_entry_two(0) === "Chris Aniszczyk")
- }
-
- test("Importing nested Parquet file (nested numbers)") {
- val result = TestSQLContext
- .parquetFile(ParquetTestData.testNestedDir2.toString)
- .toSchemaRDD
- .collect()
- assert(result.size === 1, "number of top-level rows incorrect")
- assert(result(0).size === 5, "number of fields in row incorrect")
- assert(result(0)(0) === 1)
- assert(result(0)(1) === 7)
- val subresult1 = result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- assert(subresult1.size === 3)
- assert(subresult1(0) === (1.toLong << 32))
- assert(subresult1(1) === (1.toLong << 33))
- assert(subresult1(2) === (1.toLong << 34))
- val subresult2 = result(0)(3)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
- .asInstanceOf[CatalystConverter.StructScalaType[_]]
- assert(subresult2.size === 2)
- assert(subresult2(0) === 2.5)
- assert(subresult2(1) === false)
- val subresult3 = result(0)(4)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- assert(subresult3.size === 2)
- assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 2)
- val subresult4 = subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
- assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
- assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 1)
- assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
- }
-
- test("Simple query on addressbook") {
- val data = TestSQLContext
- .parquetFile(ParquetTestData.testNestedDir1.toString)
- .toSchemaRDD
- val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect()
- assert(tmp.size === 1)
- assert(tmp(0)(0) === "Julien Le Dem")
- }
-
- test("Projection in addressbook") {
- val data = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
- data.registerTempTable("data")
- val query = sql("SELECT owner, contacts[1].name FROM data")
- val tmp = query.collect()
- assert(tmp.size === 2)
- assert(tmp(0).size === 2)
- assert(tmp(0)(0) === "Julien Le Dem")
- assert(tmp(0)(1) === "Chris Aniszczyk")
- assert(tmp(1)(0) === "A. Nonymous")
- assert(tmp(1)(1) === null)
- }
-
- test("Simple query on nested int data") {
- val data = parquetFile(ParquetTestData.testNestedDir2.toString).toSchemaRDD
- data.registerTempTable("data")
- val result1 = sql("SELECT entries[0].value FROM data").collect()
- assert(result1.size === 1)
- assert(result1(0).size === 1)
- assert(result1(0)(0) === 2.5)
- val result2 = sql("SELECT entries[0] FROM data").collect()
- assert(result2.size === 1)
- val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
- assert(subresult1.size === 2)
- assert(subresult1(0) === 2.5)
- assert(subresult1(1) === false)
- val result3 = sql("SELECT outerouter FROM data").collect()
- val subresult2 = result3(0)(0)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
- assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
- assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
- assert(result3(0)(0)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
- .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
- }
-
- test("nested structs") {
- val data = parquetFile(ParquetTestData.testNestedDir3.toString)
- .toSchemaRDD
- data.registerTempTable("data")
- val result1 = sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
- assert(result1.size === 1)
- assert(result1(0).size === 1)
- assert(result1(0)(0) === false)
- val result2 = sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
- assert(result2.size === 1)
- assert(result2(0).size === 1)
- assert(result2(0)(0) === true)
- val result3 = sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
- assert(result3.size === 1)
- assert(result3(0).size === 1)
- assert(result3(0)(0) === false)
- }
-
- test("simple map") {
- val data = TestSQLContext
- .parquetFile(ParquetTestData.testNestedDir4.toString)
- .toSchemaRDD
- data.registerTempTable("mapTable")
- val result1 = sql("SELECT data1 FROM mapTable").collect()
- assert(result1.size === 1)
- assert(result1(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
- .getOrElse("key1", 0) === 1)
- assert(result1(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
- .getOrElse("key2", 0) === 2)
- val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect()
- assert(result2(0)(0) === 1)
- }
-
- test("map with struct values") {
- val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
- data.registerTempTable("mapTable")
- val result1 = sql("SELECT data2 FROM mapTable").collect()
- assert(result1.size === 1)
- val entry1 = result1(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
- .getOrElse("seven", null)
- assert(entry1 != null)
- assert(entry1(0) === 42)
- assert(entry1(1) === "the answer")
- val entry2 = result1(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
- .getOrElse("eight", null)
- assert(entry2 != null)
- assert(entry2(0) === 49)
- assert(entry2(1) === null)
- val result2 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
- assert(result2.size === 1)
- assert(result2(0)(0) === 42.toLong)
- assert(result2(0)(1) === "the answer")
- }
-
- test("Writing out Addressbook and reading it back in") {
- // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
- // has no effect in this test case
- val tmpdir = Utils.createTempDir()
- Utils.deleteRecursively(tmpdir)
- val result = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
- result.saveAsParquetFile(tmpdir.toString)
- parquetFile(tmpdir.toString)
- .toSchemaRDD
- .registerTempTable("tmpcopy")
- val tmpdata = sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
- assert(tmpdata.size === 2)
- assert(tmpdata(0).size === 2)
- assert(tmpdata(0)(0) === "Julien Le Dem")
- assert(tmpdata(0)(1) === "Chris Aniszczyk")
- assert(tmpdata(1)(0) === "A. Nonymous")
- assert(tmpdata(1)(1) === null)
- Utils.deleteRecursively(tmpdir)
- }
-
- test("Writing out Map and reading it back in") {
- val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
- val tmpdir = Utils.createTempDir()
- Utils.deleteRecursively(tmpdir)
- data.saveAsParquetFile(tmpdir.toString)
- parquetFile(tmpdir.toString)
- .toSchemaRDD
- .registerTempTable("tmpmapcopy")
- val result1 = sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
- assert(result1.size === 1)
- assert(result1(0)(0) === 2)
- val result2 = sql("SELECT data2 FROM tmpmapcopy").collect()
- assert(result2.size === 1)
- val entry1 = result2(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
- .getOrElse("seven", null)
- assert(entry1 != null)
- assert(entry1(0) === 42)
- assert(entry1(1) === "the answer")
- val entry2 = result2(0)(0)
- .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
- .getOrElse("eight", null)
- assert(entry2 != null)
- assert(entry2(0) === 49)
- assert(entry2(1) === null)
- val result3 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
- assert(result3.size === 1)
- assert(result3(0)(0) === 42.toLong)
- assert(result3(0)(1) === "the answer")
- Utils.deleteRecursively(tmpdir)
- }
-
- test("read/write fixed-length decimals") {
- for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- val data = sparkContext.parallelize(0 to 1000)
- .map(i => NumericData(i, i / 100.0))
- .select('i, 'd cast DecimalType(precision, scale))
- data.saveAsParquetFile(tempDir)
- checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
- }
-
- // Decimals with precision above 18 are not yet supported
- intercept[RuntimeException] {
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- val data = sparkContext.parallelize(0 to 1000)
- .map(i => NumericData(i, i / 100.0))
- .select('i, 'd cast DecimalType(19, 10))
- data.saveAsParquetFile(tempDir)
- checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
- }
-
- // Unlimited-length decimals are not yet supported
- intercept[RuntimeException] {
- val tempDir = getTempFilePath("parquetTest").getCanonicalPath
- val data = sparkContext.parallelize(0 to 1000)
- .map(i => NumericData(i, i / 100.0))
- .select('i, 'd cast DecimalType.Unlimited)
- data.saveAsParquetFile(tempDir)
- checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
- }
- }
-
- def checkFilter(predicate: Predicate, filterClass: Class[_ <: FilterPredicate]): Unit = {
- val filter = ParquetFilters.createFilter(predicate)
- assert(filter.isDefined)
- assert(filter.get.getClass == filterClass)
- }
-
- test("Pushdown IsNull predicate") {
- checkFilter('a.int.isNull, classOf[Operators.Eq[Integer]])
- checkFilter('a.long.isNull, classOf[Operators.Eq[java.lang.Long]])
- checkFilter('a.float.isNull, classOf[Operators.Eq[java.lang.Float]])
- checkFilter('a.double.isNull, classOf[Operators.Eq[java.lang.Double]])
- checkFilter('a.string.isNull, classOf[Operators.Eq[Binary]])
- checkFilter('a.binary.isNull, classOf[Operators.Eq[Binary]])
- }
-
- test("Pushdown IsNotNull predicate") {
- checkFilter('a.int.isNotNull, classOf[Operators.NotEq[Integer]])
- checkFilter('a.long.isNotNull, classOf[Operators.NotEq[java.lang.Long]])
- checkFilter('a.float.isNotNull, classOf[Operators.NotEq[java.lang.Float]])
- checkFilter('a.double.isNotNull, classOf[Operators.NotEq[java.lang.Double]])
- checkFilter('a.string.isNotNull, classOf[Operators.NotEq[Binary]])
- checkFilter('a.binary.isNotNull, classOf[Operators.NotEq[Binary]])
- }
-
- test("Pushdown EqualTo predicate") {
- checkFilter('a.int === 0, classOf[Operators.Eq[Integer]])
- checkFilter('a.long === 0.toLong, classOf[Operators.Eq[java.lang.Long]])
- checkFilter('a.float === 0.toFloat, classOf[Operators.Eq[java.lang.Float]])
- checkFilter('a.double === 0.toDouble, classOf[Operators.Eq[java.lang.Double]])
- checkFilter('a.string === "foo", classOf[Operators.Eq[Binary]])
- checkFilter('a.binary === "foo".getBytes, classOf[Operators.Eq[Binary]])
- }
-
- test("Pushdown Not(EqualTo) predicate") {
- checkFilter(!('a.int === 0), classOf[Operators.NotEq[Integer]])
- checkFilter(!('a.long === 0.toLong), classOf[Operators.NotEq[java.lang.Long]])
- checkFilter(!('a.float === 0.toFloat), classOf[Operators.NotEq[java.lang.Float]])
- checkFilter(!('a.double === 0.toDouble), classOf[Operators.NotEq[java.lang.Double]])
- checkFilter(!('a.string === "foo"), classOf[Operators.NotEq[Binary]])
- checkFilter(!('a.binary === "foo".getBytes), classOf[Operators.NotEq[Binary]])
- }
-
- test("Pushdown LessThan predicate") {
- checkFilter('a.int < 0, classOf[Operators.Lt[Integer]])
- checkFilter('a.long < 0.toLong, classOf[Operators.Lt[java.lang.Long]])
- checkFilter('a.float < 0.toFloat, classOf[Operators.Lt[java.lang.Float]])
- checkFilter('a.double < 0.toDouble, classOf[Operators.Lt[java.lang.Double]])
- checkFilter('a.string < "foo", classOf[Operators.Lt[Binary]])
- checkFilter('a.binary < "foo".getBytes, classOf[Operators.Lt[Binary]])
- }
-
- test("Pushdown LessThanOrEqual predicate") {
- checkFilter('a.int <= 0, classOf[Operators.LtEq[Integer]])
- checkFilter('a.long <= 0.toLong, classOf[Operators.LtEq[java.lang.Long]])
- checkFilter('a.float <= 0.toFloat, classOf[Operators.LtEq[java.lang.Float]])
- checkFilter('a.double <= 0.toDouble, classOf[Operators.LtEq[java.lang.Double]])
- checkFilter('a.string <= "foo", classOf[Operators.LtEq[Binary]])
- checkFilter('a.binary <= "foo".getBytes, classOf[Operators.LtEq[Binary]])
- }
-
- test("Pushdown GreaterThan predicate") {
- checkFilter('a.int > 0, classOf[Operators.Gt[Integer]])
- checkFilter('a.long > 0.toLong, classOf[Operators.Gt[java.lang.Long]])
- checkFilter('a.float > 0.toFloat, classOf[Operators.Gt[java.lang.Float]])
- checkFilter('a.double > 0.toDouble, classOf[Operators.Gt[java.lang.Double]])
- checkFilter('a.string > "foo", classOf[Operators.Gt[Binary]])
- checkFilter('a.binary > "foo".getBytes, classOf[Operators.Gt[Binary]])
- }
-
- test("Pushdown GreaterThanOrEqual predicate") {
- checkFilter('a.int >= 0, classOf[Operators.GtEq[Integer]])
- checkFilter('a.long >= 0.toLong, classOf[Operators.GtEq[java.lang.Long]])
- checkFilter('a.float >= 0.toFloat, classOf[Operators.GtEq[java.lang.Float]])
- checkFilter('a.double >= 0.toDouble, classOf[Operators.GtEq[java.lang.Double]])
- checkFilter('a.string >= "foo", classOf[Operators.GtEq[Binary]])
- checkFilter('a.binary >= "foo".getBytes, classOf[Operators.GtEq[Binary]])
- }
-
- test("Comparison with null should not be pushed down") {
- val predicates = Seq(
- 'a.int === null,
- !('a.int === null),
-
- Literal(null) === 'a.int,
- !(Literal(null) === 'a.int),
-
- 'a.int < null,
- 'a.int <= null,
- 'a.int > null,
- 'a.int >= null,
-
- Literal(null) < 'a.int,
- Literal(null) <= 'a.int,
- Literal(null) > 'a.int,
- Literal(null) >= 'a.int
- )
-
- predicates.foreach { p =>
- assert(
- ParquetFilters.createFilter(p).isEmpty,
- "Comparison predicate with null shouldn't be pushed down")
- }
- }
-
- test("Import of simple Parquet files using glob wildcard pattern") {
- val testGlobDir = ParquetTestData.testGlobDir.toString
- val globPatterns = Array(testGlobDir + "/*/*", testGlobDir + "/spark-*/*", testGlobDir + "/?pa?k-*/*")
- globPatterns.foreach { path =>
- val result = parquetFile(path).collect()
- assert(result.size === 45)
- result.zipWithIndex.foreach {
- case (row, index) => {
- val checkBoolean =
- if ((index % 15) % 3 == 0)
- row(0) == true
- else
- row(0) == false
- assert(checkBoolean === true, s"boolean field value in line $index did not match")
- if ((index % 15) % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
- assert(row(2) === "abc", s"string field value in line $index did not match")
- assert(row(3) === ((index.toLong % 15) << 33), s"long value in line $index did not match")
- assert(row(4) === 2.5F, s"float field value in line $index did not match")
- assert(row(5) === 4.5D, s"double field value in line $index did not match")
- }
- }
+ withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+ checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
deleted file mode 100644
index 7b3f8c22af..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.test.TestSQLContext._
-
-/**
- * A test suite that tests various Parquet queries.
- */
-class ParquetQuerySuite2 extends QueryTest with ParquetTest {
- val sqlContext = TestSQLContext
-
- test("simple projection") {
- withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
- checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
- }
- }
-
- test("appending") {
- val data = (0 until 10).map(i => (i, i.toString))
- withParquetTable(data, "t") {
- sql("INSERT INTO t SELECT * FROM t")
- checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
- }
- }
-
- test("self-join") {
- // 4 rows, cells of column 1 of row 2 and row 4 are null
- val data = (1 to 4).map { i =>
- val maybeInt = if (i % 2 == 0) None else Some(i)
- (maybeInt, i.toString)
- }
-
- withParquetTable(data, "t") {
- val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
- val queryOutput = selfJoin.queryExecution.analyzed.output
-
- assertResult(4, s"Field count mismatches")(queryOutput.size)
- assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
- queryOutput.filter(_.name == "_1").map(_.exprId).size
- }
-
- checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
- }
- }
-
- test("nested data - struct with array field") {
- val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
- case Tuple1((_, Seq(string))) => Row(string)
- })
- }
- }
-
- test("nested data - array of struct") {
- val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
- case Tuple1(Seq((_, string))) => Row(string)
- })
- }
- }
-
- test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
- withParquetTable((1 to 10).map(Tuple1.apply), "t") {
- checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
- }
- }
-}