aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-07 18:09:18 +0800
committerCheng Lian <lian@databricks.com>2016-07-07 18:09:18 +0800
commit986b2514013ed9ebab526f2cf3dc714cc9e480bf (patch)
tree2c1fbd3515c18a50702bb67399339163ecd42196
parentab05db0b48f395543cd7d91e2ad9dd760516868b (diff)
downloadspark-986b2514013ed9ebab526f2cf3dc714cc9e480bf.tar.gz
spark-986b2514013ed9ebab526f2cf3dc714cc9e480bf.tar.bz2
spark-986b2514013ed9ebab526f2cf3dc714cc9e480bf.zip
[SPARK-16400][SQL] Remove InSet filter pushdown from Parquet
## What changes were proposed in this pull request? This patch removes InSet filter pushdown from Parquet data source, since row-based pushdown is not beneficial to Spark and brings extra complexity to the code base. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #14076 from rxin/SPARK-16400.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala57
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala30
3 files changed, 18 insertions, 76 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 0284ecc0d9..0c2ebb0e5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -22,7 +22,7 @@ import scala.util.Try
import org.json4s.JsonDSL._
-import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
@@ -389,6 +389,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
object StructType extends AbstractDataType {
+ /**
+ * A key used in field metadata to indicate that the field comes from the result of merging
+ * two different StructTypes that do not always contain the field. That is to say, the field
+ * might be missing (optional) from one of the StructTypes.
+ */
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"
override private[sql] def defaultConcreteType: DataType = new StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index e0a113a1b3..426263fa44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.io.Serializable
-
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
@@ -26,18 +24,10 @@ import org.apache.parquet.io.api.Binary
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
+/**
+ * Some utility function to convert Spark data source filters to Parquet filters.
+ */
private[sql] object ParquetFilters {
- case class SetInFilter[T <: Comparable[T]](
- valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
-
- override def keep(value: T): Boolean = {
- value != null && valueSet.contains(value)
- }
-
- override def canDrop(statistics: Statistics[T]): Boolean = false
-
- override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
- }
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
@@ -154,36 +144,16 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}
- private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
- case LongType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
- case FloatType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
- case DoubleType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
- case StringType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
- case BinaryType =>
- (n: String, v: Set[Any]) =>
- FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
- }
-
/**
+ * Returns a map from name of the column to the data type, if predicate push down applies
+ * (i.e. not an optional field).
+ *
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
- * using such fields, otherwise Parquet library will throw exception. Here we filter out such
- * fields.
+ * using such fields, otherwise Parquet library will throw exception (PARQUET-389).
+ * Here we filter out such fields.
*/
- private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match {
+ private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
@@ -191,15 +161,15 @@ private[sql] object ParquetFilters {
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
- }.map(f => f.name -> f.dataType)
- case _ => Array.empty[(String, DataType)]
+ }.map(f => f.name -> f.dataType).toMap
+ case _ => Map.empty[String, DataType]
}
/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
- val dataTypeOf = getFieldMap(schema).toMap
+ val dataTypeOf = getFieldMap(schema)
// NOTE:
//
@@ -242,9 +212,6 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
- case sources.In(name, valueSet) =>
- makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
-
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 84fdcfea3c..f59d474d00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -514,36 +514,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
- test("SPARK-11164: test the parquet filter in") {
- import testImplicits._
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- withTempPath { dir =>
- val path = s"${dir.getCanonicalPath}/table1"
- (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
-
- // When a filter is pushed to Parquet, Parquet can apply it to every row.
- // So, we can check the number of rows returned from the Parquet
- // to make sure our filter pushdown work.
- val df = spark.read.parquet(path).where("b in (0,2)")
- assert(stripSparkFilter(df).count == 3)
-
- val df1 = spark.read.parquet(path).where("not (b in (1))")
- assert(stripSparkFilter(df1).count == 3)
-
- val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)")
- assert(stripSparkFilter(df2).count == 2)
-
- val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)")
- assert(stripSparkFilter(df3).count == 4)
-
- val df4 = spark.read.parquet(path).where("not (a <= 2)")
- assert(stripSparkFilter(df4).count == 3)
- }
- }
- }
- }
-
test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
// Here the schema becomes as below: