aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-14 14:59:35 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-14 14:59:35 -0800
commite47c38763914aaf89a7a851c5f41b7549a75615b (patch)
treebb2e6762bbb08b19a4a0eb32a0f1a7ea97134919 /sql/core
parenta0300ea32a9d92bd51c72930bc3979087b0082b2 (diff)
downloadspark-e47c38763914aaf89a7a851c5f41b7549a75615b.tar.gz
spark-e47c38763914aaf89a7a851c5f41b7549a75615b.tar.bz2
spark-e47c38763914aaf89a7a851c5f41b7549a75615b.zip
[SPARK-4391][SQL] Configure parquet filters using SQLConf
This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext. In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258). When those are fixed we should turn it back on by default. Author: Michael Armbrust <michael@databricks.com> Closes #3258 from marmbrus/parquetFilters and squashes the following commits: 5655bfe [Michael Armbrust] Remove extra line. 15e9a98 [Michael Armbrust] Enable filters for tests 75afd39 [Michael Armbrust] Fix comments 78fa02d [Michael Armbrust] off by default e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala2
5 files changed, 21 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495aa64..cd7d78e684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
import java.util.Properties
-
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
+
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
+
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
// This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
+ /** When true predicates will be passed to the parquet record reader when possible. */
+ private[spark] def parquetFilterPushDown =
+ getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+
/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c05ff..03cd5bd627 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
- if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
+ if (sqlContext.parquetFilterPushDown) {
(filters: Seq[Expression]) => {
filters.filter { filter =>
// Note: filters cannot be pushed down to Parquet if they contain more complex
@@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
prunePushedDownFilters,
- ParquetTableScan(_, relation, filters)) :: Nil
+ ParquetTableScan(
+ _,
+ relation,
+ if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 1e67799e83..9a3f6d388d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
- // set this to false if pushdown should be disabled
- val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
val filters: Seq[CatalystFilter] = filterExpressions.collect {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 74c43e053b..5f93279a08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -23,6 +23,8 @@ import java.text.SimpleDateFormat
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}
+import org.apache.spark.annotation.DeveloperApi
+
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
@@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
+ * :: DeveloperApi ::
* Parquet table scan operator. Imports the file that backs the given
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
@@ -108,15 +111,11 @@ case class ParquetTableScan(
// Note 1: the input format ignores all predicates that cannot be expressed
// as simple column predicate filters in Parquet. Here we just record
// the whole pruning predicate.
- // Note 2: you can disable filter predicate pushdown by setting
- // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
- if (columnPruningPred.length > 0 &&
- sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
-
+ if (columnPruningPred.length > 0) {
// Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering
val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred)
if (filter != null){
- val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate()
+ val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate
ParquetInputFormat.setFilterPredicate(conf, filterPredicate)
}
}
@@ -193,6 +192,7 @@ case class ParquetTableScan(
}
/**
+ * :: DeveloperApi ::
* Operator that acts as a sink for queries on RDDs and can be used to
* store the output inside a directory of Parquet files. This operator
* is similar to Hive's INSERT INTO TABLE operation in the sense that
@@ -208,6 +208,7 @@ case class ParquetTableScan(
* cause unpredicted behaviour and therefore results in a RuntimeException
* (only detected via filename pattern so will not catch all cases).
*/
+@DeveloperApi
case class InsertIntoParquetTable(
relation: ParquetRelation,
child: SparkPlan,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3cccafe92d..80a3e0b4c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
.registerTempTable("testfiltersource")
+
+ setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
}
override def afterAll() {