aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-06-17 17:11:38 -0700
committerCheng Lian <lian@databricks.com>2016-06-17 17:11:38 -0700
commitebb9a3b6fd834e2c856a192b4455aab83e9c4dc8 (patch)
tree06d8e79bcba69217559bdd44a61fd4a94d885cde /sql
parent7d65a0db4a231882200513836f2720f59b35f364 (diff)
downloadspark-ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8.tar.gz
spark-ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8.tar.bz2
spark-ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8.zip
[SPARK-15916][SQL] JDBC filter push down should respect operator precedence
## What changes were proposed in this pull request? This PR fixes the problem that the precedence order is messed when pushing where-clause expression to JDBC layer. **Case 1:** For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC may returns less or more rows than expected. **Case 2:** For sql `select * from table where always_false_condition`, the result table may not be empty if the JDBC RDD is partitioned using where-clause: ``` spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where clause"...) ``` ## How was this patch tested? Unit test. This PR also close #13640 Author: hyukjinkwon <gurwls223@gmail.com> Author: Sean Zhong <seanzhong@databricks.com> Closes #13743 from clockfly/SPARK-15916.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala26
2 files changed, 28 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8d0906e574..44cfbb9fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -305,14 +305,14 @@ private[sql] class JDBCRDD(
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
*/
private val filterWhereClause: String =
- filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ")
+ filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
/**
* A WHERE clause representing both `filters`, if any, and the current partition.
*/
private def getWhereClause(part: JDBCPartition): String = {
if (part.whereClause != null && filterWhereClause.length > 0) {
- "WHERE " + filterWhereClause + " AND " + part.whereClause
+ "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
} else if (part.whereClause != null) {
"WHERE " + part.whereClause
} else if (filterWhereClause.length > 0) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index abb7918ae6..d6ec40c18b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite
assert(oracleDialect.getJDBCType(StringType).
map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
}
+
+ private def assertEmptyQuery(sqlString: String): Unit = {
+ assert(sql(sqlString).collect().isEmpty)
+ }
+
+ test("SPARK-15916: JDBC filter operator push down should respect operator precedence") {
+ val TRUE = "NAME != 'non_exists'"
+ val FALSE1 = "THEID > 1000000000"
+ val FALSE2 = "THEID < -1000000000"
+
+ assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2")
+ assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)")
+
+ // Tests JDBCPartition whereClause clause push down.
+ withTempTable("tempFrame") {
+ val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
+ val df = spark.read.jdbc(
+ urlWithUserAndPass,
+ "TEST.PEOPLE",
+ predicates = Array[String](jdbcPartitionWhereClause),
+ new Properties)
+
+ df.createOrReplaceTempView("tempFrame")
+ assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
+ }
+ }
}