aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-01 18:52:39 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-01 18:52:39 -0800
commitec1003219b8978291abca2fc409ee61b1bb40a38 (patch)
treed41982ef8a8fc3284a2a633192691889c174d444 /sql
parent8cf4a1f02e40f37f940f6a347c078f5879585bf4 (diff)
downloadspark-ec1003219b8978291abca2fc409ee61b1bb40a38.tar.gz
spark-ec1003219b8978291abca2fc409ee61b1bb40a38.tar.bz2
spark-ec1003219b8978291abca2fc409ee61b1bb40a38.zip
[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source
Not all Catalyst filter expressions can be converted to Parquet filter predicates. We should try to convert each individual predicate and then collect those convertible ones. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4255 from liancheng/spark-5465 and squashes the following commits: 14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1b50afbbab..1e794cad73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -20,26 +20,26 @@ import java.util.{List => JList}
import scala.collection.JavaConversions._
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
-
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
+import parquet.filter2.predicate.FilterApi
import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
-import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition}
/**
* Allows creation of parquet based tables using the syntax
- * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
+ * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
@@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
}
- // Push down filters when possible
+ // Push down filters when possible. Notice that not all filters can be converted to Parquet
+ // filter predicate. Here we try to convert each individual predicate and only collect those
+ // convertible ones.
predicates
- .reduceOption(And)
.flatMap(ParquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
.filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))