diff options
Diffstat (limited to 'sql/core/src')
5 files changed, 21 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 279495aa64..cd7d78e684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._ import java.util.Properties - private[spark] object SQLConf { val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed" val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize" @@ -32,9 +31,12 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" + val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" + val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" + val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" // This is only used for the thriftserver @@ -90,6 +92,10 @@ private[sql] trait SQLConf { /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt + /** When true predicates will be passed to the parquet record reader when possible. */ + private[spark] def parquetFilterPushDown = + getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cc7e0c05ff..03cd5bd627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = - if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { + if (sqlContext.parquetFilterPushDown) { (filters: Seq[Expression]) => { filters.filter { filter => // Note: filters cannot be pushed down to Parquet if they contain more complex @@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { projectList, filters, prunePushedDownFilters, - ParquetTableScan(_, relation, filters)) :: Nil + ParquetTableScan( + _, + relation, + if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 1e67799e83..9a3f6d388d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._ private[sql] object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" - // set this to false if pushdown should be disabled - val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown" def createRecordFilter(filterExpressions: Seq[Expression]): Filter = { val filters: Seq[CatalystFilter] = filterExpressions.collect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 74c43e053b..5f93279a08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -23,6 +23,8 @@ import java.text.SimpleDateFormat import java.util.concurrent.{Callable, TimeUnit} import java.util.{ArrayList, Collections, Date, List => JList} +import org.apache.spark.annotation.DeveloperApi + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.util.Try @@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** + * :: DeveloperApi :: * Parquet table scan operator. Imports the file that backs the given * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ @@ -108,15 +111,11 @@ case class ParquetTableScan( // Note 1: the input format ignores all predicates that cannot be expressed // as simple column predicate filters in Parquet. Here we just record // the whole pruning predicate. - // Note 2: you can disable filter predicate pushdown by setting - // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf. - if (columnPruningPred.length > 0 && - sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - + if (columnPruningPred.length > 0) { // Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred) if (filter != null){ - val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate() + val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate ParquetInputFormat.setFilterPredicate(conf, filterPredicate) } } @@ -193,6 +192,7 @@ case class ParquetTableScan( } /** + * :: DeveloperApi :: * Operator that acts as a sink for queries on RDDs and can be used to * store the output inside a directory of Parquet files. This operator * is similar to Hive's INSERT INTO TABLE operation in the sense that @@ -208,6 +208,7 @@ case class ParquetTableScan( * cause unpredicted behaviour and therefore results in a RuntimeException * (only detected via filename pattern so will not catch all cases). */ +@DeveloperApi case class InsertIntoParquetTable( relation: ParquetRelation, child: SparkPlan, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 3cccafe92d..80a3e0b4c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA testRDD.registerTempTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) .registerTempTable("testfiltersource") + + setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true") } override def afterAll() { |