aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala2
5 files changed, 21 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495aa64..cd7d78e684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
import java.util.Properties
-
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
+
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
+
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
// This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
+ /** When true predicates will be passed to the parquet record reader when possible. */
+ private[spark] def parquetFilterPushDown =
+ getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+
/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c05ff..03cd5bd627 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
- if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
+ if (sqlContext.parquetFilterPushDown) {
(filters: Seq[Expression]) => {
filters.filter { filter =>
// Note: filters cannot be pushed down to Parquet if they contain more complex
@@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
prunePushedDownFilters,
- ParquetTableScan(_, relation, filters)) :: Nil
+ ParquetTableScan(
+ _,
+ relation,
+ if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 1e67799e83..9a3f6d388d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
- // set this to false if pushdown should be disabled
- val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
val filters: Seq[CatalystFilter] = filterExpressions.collect {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 74c43e053b..5f93279a08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -23,6 +23,8 @@ import java.text.SimpleDateFormat
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}
+import org.apache.spark.annotation.DeveloperApi
+
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
@@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
+ * :: DeveloperApi ::
* Parquet table scan operator. Imports the file that backs the given
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
@@ -108,15 +111,11 @@ case class ParquetTableScan(
// Note 1: the input format ignores all predicates that cannot be expressed
// as simple column predicate filters in Parquet. Here we just record
// the whole pruning predicate.
- // Note 2: you can disable filter predicate pushdown by setting
- // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
- if (columnPruningPred.length > 0 &&
- sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
-
+ if (columnPruningPred.length > 0) {
// Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering
val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred)
if (filter != null){
- val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate()
+ val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate
ParquetInputFormat.setFilterPredicate(conf, filterPredicate)
}
}
@@ -193,6 +192,7 @@ case class ParquetTableScan(
}
/**
+ * :: DeveloperApi ::
* Operator that acts as a sink for queries on RDDs and can be used to
* store the output inside a directory of Parquet files. This operator
* is similar to Hive's INSERT INTO TABLE operation in the sense that
@@ -208,6 +208,7 @@ case class ParquetTableScan(
* cause unpredicted behaviour and therefore results in a RuntimeException
* (only detected via filename pattern so will not catch all cases).
*/
+@DeveloperApi
case class InsertIntoParquetTable(
relation: ParquetRelation,
child: SparkPlan,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3cccafe92d..80a3e0b4c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
.registerTempTable("testfiltersource")
+
+ setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
}
override def afterAll() {