diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2015-12-30 13:34:37 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-12-30 13:34:37 -0800 |
commit | 5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135 (patch) | |
tree | 896c6695783e92180fab7ecdb2d1fe2bf80a3dff | |
parent | 27a42c7108ced48a7f558990de2e4fc7ed340119 (diff) | |
download | spark-5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135.tar.gz spark-5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135.tar.bz2 spark-5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135.zip |
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Support AND/OR/IN/LIKE push-down filters for JDBC
This is rework from #10386 and add more tests and LIKE push-down support.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #10468 from maropu/SupportMorePushdownInJdbc.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 9 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 28 |
2 files changed, 35 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 4e2f5059be..7072ee4b4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -179,6 +179,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" + case arrayValue: Array[Object] => arrayValue.map(compileValue).mkString(", ") case _ => value } @@ -191,13 +192,19 @@ private[sql] object JDBCRDD extends Logging { */ private def compileFilter(f: Filter): String = f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" + case Not(f) => s"(NOT (${compileFilter(f)}))" case LessThan(attr, value) => s"$attr < ${compileValue(value)}" case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" + case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" + case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" case IsNull(attr) => s"$attr IS NULL" case IsNotNull(attr) => s"$attr IS NOT NULL" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Or(f1, f2) => s"(${compileFilter(f1)}) OR (${compileFilter(f2)})" + case And(f1, f2) => s"(${compileFilter(f1)}) AND (${compileFilter(f2)})" case _ => null } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4044a10ce7..00e37f107a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -186,8 +187,26 @@ class JDBCSuite extends SparkFunSuite assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + .collect().size === 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + + "AND THEID = 2")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + + // This is a test to reflect discussion in SPARK-12218. + // The older versions of spark have this kind of bugs in parquet data source. + val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") + val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") + assert(df1.collect.toSet === Set(Row("mary", 2))) + assert(df2.collect.toSet === Set(Row("mary", 2))) } test("SELECT * WHERE (quoted strings)") { @@ -437,7 +456,11 @@ class JDBCSuite extends SparkFunSuite val compileFilter = PrivateMethod[String]('compileFilter) def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") - assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") + assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) + === "(col0 = 0) AND (col1 = 'def')") + assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) + === "(col0 = 2) OR (col1 = 'ghi')") assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") assert(doCompileFilter(LessThan("col3", Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") @@ -445,6 +468,9 @@ class JDBCSuite extends SparkFunSuite assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") + assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) + === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") } |