aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-02-10 09:45:13 +0800
committerYin Huai <yhuai@databricks.com>2016-02-10 09:45:13 +0800
commit6f710f9fd4f85370557b7705020ff16f2385e645 (patch)
tree7c2d45aff037d3c8b07f212a01e0db51233b6c55 /sql
parent9267bc68fab65c6a798e065a1dbe0f5171df3077 (diff)
downloadspark-6f710f9fd4f85370557b7705020ff16f2385e645.tar.gz
spark-6f710f9fd4f85370557b7705020ff16f2385e645.tar.bz2
spark-6f710f9fd4f85370557b7705020ff16f2385e645.zip
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter
Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx' Current plan: ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == +- Filter (col0#0 = xxx) +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` This patch enables a plan below; ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #10427 from maropu/RemoveFilterInJdbcScan.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala70
3 files changed, 56 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index d867e144e5..befba867bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging {
* Turns a single Filter into a String representing a SQL expression.
* Returns None for an unhandled filter.
*/
- private def compileFilter(f: Filter): Option[String] = {
+ private[jdbc] def compileFilter(f: Filter): Option[String] = {
Option(f match {
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
case EqualNullSafe(attr, value) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 572be823ca..ee6373d03e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
+ // Check if JDBCRDD.compileFilter can accept input filters
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+ filters.filter(JDBCRDD.compileFilter(_).isEmpty)
+ }
+
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 518607543b..7a0f7abaa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}
import org.h2.jdbc.JdbcSQLException
-import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.ExplainCommand
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.sources._
@@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite
}
test("SELECT * WHERE (simple predicates)") {
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
+ def checkPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is removed in a physical plan and
+ // the plan only has PhysicalRDD to scan JDBCRelation.
+ assert(parentPlan.isInstanceOf[PhysicalRDD])
+ assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+ df
+ }
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+ "AND THEID = 2")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
// This is a test to reflect discussion in SPARK-12218.
// The older versions of spark have this kind of bugs in parquet data source.
@@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite
val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
assert(df1.collect.toSet === Set(Row("mary", 2)))
assert(df2.collect.toSet === Set(Row("mary", 2)))
+
+ def checkNotPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
+ // cannot compile given predicates.
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+ assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter])
+ df
+ }
+ assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
+ assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
+ }
+
+ test("SELECT COUNT(1) WHERE (predicates)") {
+ // Check if an answer is correct when Filter is removed from operations such as count() which
+ // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in
+ // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters
+ // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD
+ // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more
+ // discussions.
+ assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1)))
}
test("SELECT * WHERE (quoted strings)") {