aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-07-11 22:23:32 -0700
committerReynold Xin <rxin@databricks.com>2016-07-11 22:23:32 -0700
commitc9a6762150cfd62691a6361e05d2839b110fe8d0 (patch)
tree628a1d6e9c6f4f1c984d05bfab435e049d549ebf
parentb1e5281c5cb429e338c3719c13c0b93078d7312a (diff)
downloadspark-c9a6762150cfd62691a6361e05d2839b110fe8d0.tar.gz
spark-c9a6762150cfd62691a6361e05d2839b110fe8d0.tar.bz2
spark-c9a6762150cfd62691a6361e05d2839b110fe8d0.zip
[SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
## What changes were proposed in this pull request? It would be useful to support listing the columns that are referenced by a filter. This can help simplify data source planning, because with this we would be able to implement unhandledFilters method in HadoopFsRelation. This is based on rxin's patch (#13901) and adds unit tests. ## How was this patch tested? Added a new suite FiltersSuite. Author: petermaxlee <petermaxlee@gmail.com> Author: Reynold Xin <rxin@databricks.com> Closes #14120 from petermaxlee/SPARK-16199.
-rw-r--r--project/MimaExcludes.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala71
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FiltersSuite.scala89
3 files changed, 151 insertions, 16 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4bd6156288..56061559fe 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -35,7 +35,12 @@ import com.typesafe.tools.mima.core.ProblemFilters._
object MimaExcludes {
// Exclude rules for 2.1.x
- lazy val v21excludes = v20excludes
+ lazy val v21excludes = v20excludes ++ {
+ Seq(
+ // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references")
+ )
+ }
// Exclude rules for 2.0.x
lazy val v20excludes = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 9130e77ea5..13c0766219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -26,7 +26,18 @@ package org.apache.spark.sql.sources
*
* @since 1.3.0
*/
-abstract class Filter
+abstract class Filter {
+ /**
+ * List of columns that are referenced by this filter.
+ * @since 2.1.0
+ */
+ def references: Array[String]
+
+ protected def findReferences(value: Any): Array[String] = value match {
+ case f: Filter => f.references
+ case _ => Array.empty
+ }
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a value
@@ -34,7 +45,9 @@ abstract class Filter
*
* @since 1.3.0
*/
-case class EqualTo(attribute: String, value: Any) extends Filter
+case class EqualTo(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* Performs equality comparison, similar to [[EqualTo]]. However, this differs from [[EqualTo]]
@@ -43,7 +56,9 @@ case class EqualTo(attribute: String, value: Any) extends Filter
*
* @since 1.5.0
*/
-case class EqualNullSafe(attribute: String, value: Any) extends Filter
+case class EqualNullSafe(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a value
@@ -51,7 +66,9 @@ case class EqualNullSafe(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
-case class GreaterThan(attribute: String, value: Any) extends Filter
+case class GreaterThan(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a value
@@ -59,7 +76,9 @@ case class GreaterThan(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
-case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
+case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a value
@@ -67,7 +86,9 @@ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
-case class LessThan(attribute: String, value: Any) extends Filter
+case class LessThan(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a value
@@ -75,7 +96,9 @@ case class LessThan(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
-case class LessThanOrEqual(attribute: String, value: Any) extends Filter
+case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
+ override def references: Array[String] = Array(attribute) ++ findReferences(value)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to one of the values in the array.
@@ -99,6 +122,8 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
override def toString: String = {
s"In($attribute, [${values.mkString(",")}]"
}
+
+ override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences)
}
/**
@@ -106,35 +131,45 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
*
* @since 1.3.0
*/
-case class IsNull(attribute: String) extends Filter
+case class IsNull(attribute: String) extends Filter {
+ override def references: Array[String] = Array(attribute)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to a non-null value.
*
* @since 1.3.0
*/
-case class IsNotNull(attribute: String) extends Filter
+case class IsNotNull(attribute: String) extends Filter {
+ override def references: Array[String] = Array(attribute)
+}
/**
* A filter that evaluates to `true` iff both `left` or `right` evaluate to `true`.
*
* @since 1.3.0
*/
-case class And(left: Filter, right: Filter) extends Filter
+case class And(left: Filter, right: Filter) extends Filter {
+ override def references: Array[String] = left.references ++ right.references
+}
/**
* A filter that evaluates to `true` iff at least one of `left` or `right` evaluates to `true`.
*
* @since 1.3.0
*/
-case class Or(left: Filter, right: Filter) extends Filter
+case class Or(left: Filter, right: Filter) extends Filter {
+ override def references: Array[String] = left.references ++ right.references
+}
/**
* A filter that evaluates to `true` iff `child` is evaluated to `false`.
*
* @since 1.3.0
*/
-case class Not(child: Filter) extends Filter
+case class Not(child: Filter) extends Filter {
+ override def references: Array[String] = child.references
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to
@@ -142,7 +177,9 @@ case class Not(child: Filter) extends Filter
*
* @since 1.3.1
*/
-case class StringStartsWith(attribute: String, value: String) extends Filter
+case class StringStartsWith(attribute: String, value: String) extends Filter {
+ override def references: Array[String] = Array(attribute)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to
@@ -150,7 +187,9 @@ case class StringStartsWith(attribute: String, value: String) extends Filter
*
* @since 1.3.1
*/
-case class StringEndsWith(attribute: String, value: String) extends Filter
+case class StringEndsWith(attribute: String, value: String) extends Filter {
+ override def references: Array[String] = Array(attribute)
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to
@@ -158,4 +197,6 @@ case class StringEndsWith(attribute: String, value: String) extends Filter
*
* @since 1.3.1
*/
-case class StringContains(attribute: String, value: String) extends Filter
+case class StringContains(attribute: String, value: String) extends Filter {
+ override def references: Array[String] = Array(attribute)
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FiltersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FiltersSuite.scala
new file mode 100644
index 0000000000..1cb7a2156c
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FiltersSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit test suites for data source filters.
+ */
+class FiltersSuite extends SparkFunSuite {
+
+ test("EqualTo references") {
+ assert(EqualTo("a", "1").references.toSeq == Seq("a"))
+ assert(EqualTo("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("EqualNullSafe references") {
+ assert(EqualNullSafe("a", "1").references.toSeq == Seq("a"))
+ assert(EqualNullSafe("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("GreaterThan references") {
+ assert(GreaterThan("a", "1").references.toSeq == Seq("a"))
+ assert(GreaterThan("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("GreaterThanOrEqual references") {
+ assert(GreaterThanOrEqual("a", "1").references.toSeq == Seq("a"))
+ assert(GreaterThanOrEqual("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("LessThan references") {
+ assert(LessThan("a", "1").references.toSeq == Seq("a"))
+ assert(LessThan("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("LessThanOrEqual references") {
+ assert(LessThanOrEqual("a", "1").references.toSeq == Seq("a"))
+ assert(LessThanOrEqual("a", EqualTo("b", "2")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("In references") {
+ assert(In("a", Array("1")).references.toSeq == Seq("a"))
+ assert(In("a", Array("1", EqualTo("b", "2"))).references.toSeq == Seq("a", "b"))
+ }
+
+ test("IsNull references") {
+ assert(IsNull("a").references.toSeq == Seq("a"))
+ }
+
+ test("IsNotNull references") {
+ assert(IsNotNull("a").references.toSeq == Seq("a"))
+ }
+
+ test("And references") {
+ assert(And(EqualTo("a", "1"), EqualTo("b", "1")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("Or references") {
+ assert(Or(EqualTo("a", "1"), EqualTo("b", "1")).references.toSeq == Seq("a", "b"))
+ }
+
+ test("StringStartsWith references") {
+ assert(StringStartsWith("a", "str").references.toSeq == Seq("a"))
+ }
+
+ test("StringEndsWith references") {
+ assert(StringEndsWith("a", "str").references.toSeq == Seq("a"))
+ }
+
+ test("StringContains references") {
+ assert(StringContains("a", "str").references.toSeq == Seq("a"))
+ }
+}