aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-03-31 00:19:51 -0700
committerReynold Xin <rxin@databricks.com>2015-03-31 00:19:51 -0700
commitf07e714062f02feadff10a45f9b9061444bb8ec5 (patch)
treea05f7aa2609916dfb27993adae90e729dfb0578c /sql
parent56775571cb938c819e5f7c3d49c5dd416ed034cb (diff)
downloadspark-f07e714062f02feadff10a45f9b9061444bb8ec5.tar.gz
spark-f07e714062f02feadff10a45f9b9061444bb8ec5.tar.bz2
spark-f07e714062f02feadff10a45f9b9061444bb8ec5.zip
[SPARK-6625][SQL] Add common string filters to data sources.
Filters such as startsWith, endsWith, contains will be very useful for data sources that provide search functionality, e.g. Succinct, Elastic Search, Solr. I also took this chance to improve documentation for the data source filters. Author: Reynold Xin <rxin@databricks.com> Closes #5285 from rxin/ds-string-filters and squashes the following commits: f021727 [Reynold Xin] Fixed grammar. 7695a52 [Reynold Xin] [SPARK-6625][SQL] Add common string filters to data sources.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala69
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala73
4 files changed, 133 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 67f3507c61..83b603a4bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, Strategy, execution, sources}
/**
@@ -166,6 +167,15 @@ private[sql] object DataSourceStrategy extends Strategy {
case expressions.Not(child) =>
translate(child).map(sources.Not)
+ case expressions.StartsWith(a: Attribute, Literal(v: String, StringType)) =>
+ Some(sources.StringStartsWith(a.name, v))
+
+ case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) =>
+ Some(sources.StringEndsWith(a.name, v))
+
+ case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) =>
+ Some(sources.StringContains(a.name, v))
+
case _ => None
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 1e4505e36d..791046e007 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,16 +17,85 @@
package org.apache.spark.sql.sources
+/**
+ * A filter predicate for data sources.
+ */
abstract class Filter
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * equal to `value`.
+ */
case class EqualTo(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * greater than `value`.
+ */
case class GreaterThan(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * greater than or equal to `value`.
+ */
case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * less than `value`.
+ */
case class LessThan(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * less than or equal to `value`.
+ */
case class LessThanOrEqual(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to one of the values in the array.
+ */
case class In(attribute: String, values: Array[Any]) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to null.
+ */
case class IsNull(attribute: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a non-null value.
+ */
case class IsNotNull(attribute: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff both `left` or `right` evaluate to `true`.
+ */
case class And(left: Filter, right: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff at least one of `left` or `right` evaluates to `true`.
+ */
case class Or(left: Filter, right: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff `child` is evaluated to `false`.
+ */
case class Not(child: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that starts with `value`.
+ */
+case class StringStartsWith(attribute: String, value: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that starts with `value`.
+ */
+case class StringEndsWith(attribute: String, value: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that contains the string `value`.
+ */
+case class StringContains(attribute: String, value: String) extends Filter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a046a48c17..8f9946a5a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -152,6 +152,9 @@ trait PrunedScan {
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
+ * The actual filter should be the conjunction of all `filters`,
+ * i.e. they should be "and" together.
+ *
* The pushed down filters are currently purely an optimization as they will all be evaluated
* again. This means it is safe to use them with methods that produce false positives such
* as filtering partitions based on a bloom filter.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index ffeccf0b69..72ddc0ea2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -35,20 +35,23 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQL
extends BaseRelation
with PrunedFilteredScan {
- override def schema =
+ override def schema: StructType =
StructType(
StructField("a", IntegerType, nullable = false) ::
- StructField("b", IntegerType, nullable = false) :: Nil)
+ StructField("b", IntegerType, nullable = false) ::
+ StructField("c", StringType, nullable = false) :: Nil)
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
val rowBuilders = requiredColumns.map {
case "a" => (i: Int) => Seq(i)
case "b" => (i: Int) => Seq(i * 2)
+ case "c" => (i: Int) => Seq((i - 1 + 'a').toChar.toString * 10)
}
FiltersPushed.list = filters
- def translateFilter(filter: Filter): Int => Boolean = filter match {
+ // Predicate test on integer column
+ def translateFilterOnA(filter: Filter): Int => Boolean = filter match {
case EqualTo("a", v) => (a: Int) => a == v
case LessThan("a", v: Int) => (a: Int) => a < v
case LessThanOrEqual("a", v: Int) => (a: Int) => a <= v
@@ -57,13 +60,27 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQL
case In("a", values) => (a: Int) => values.map(_.asInstanceOf[Int]).toSet.contains(a)
case IsNull("a") => (a: Int) => false // Int can't be null
case IsNotNull("a") => (a: Int) => true
- case Not(pred) => (a: Int) => !translateFilter(pred)(a)
- case And(left, right) => (a: Int) => translateFilter(left)(a) && translateFilter(right)(a)
- case Or(left, right) => (a: Int) => translateFilter(left)(a) || translateFilter(right)(a)
+ case Not(pred) => (a: Int) => !translateFilterOnA(pred)(a)
+ case And(left, right) => (a: Int) =>
+ translateFilterOnA(left)(a) && translateFilterOnA(right)(a)
+ case Or(left, right) => (a: Int) =>
+ translateFilterOnA(left)(a) || translateFilterOnA(right)(a)
case _ => (a: Int) => true
}
- def eval(a: Int) = !filters.map(translateFilter(_)(a)).contains(false)
+ // Predicate test on string column
+ def translateFilterOnC(filter: Filter): String => Boolean = filter match {
+ case StringStartsWith("c", v) => _.startsWith(v)
+ case StringEndsWith("c", v) => _.endsWith(v)
+ case StringContains("c", v) => _.contains(v)
+ case _ => (c: String) => true
+ }
+
+ def eval(a: Int) = {
+ val c = (a - 1 + 'a').toChar.toString * 10
+ !filters.map(translateFilterOnA(_)(a)).contains(false) &&
+ !filters.map(translateFilterOnC(_)(c)).contains(false)
+ }
sqlContext.sparkContext.parallelize(from to to).filter(eval).map(i =>
Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty)))
@@ -93,7 +110,7 @@ class FilteredScanSuite extends DataSourceTest {
sqlTest(
"SELECT * FROM oneToTenFiltered",
- (1 to 10).map(i => Row(i, i * 2)).toSeq)
+ (1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 10)).toSeq)
sqlTest(
"SELECT a, b FROM oneToTenFiltered",
@@ -128,41 +145,53 @@ class FilteredScanSuite extends DataSourceTest {
(2 to 10 by 2).map(i => Row(i, i)).toSeq)
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a = 1",
- Seq(1).map(i => Row(i, i * 2)).toSeq)
+ "SELECT a, b FROM oneToTenFiltered WHERE a = 1",
+ Seq(1).map(i => Row(i, i * 2)))
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)",
- Seq(1,3,5).map(i => Row(i, i * 2)).toSeq)
+ "SELECT a, b FROM oneToTenFiltered WHERE a IN (1,3,5)",
+ Seq(1,3,5).map(i => Row(i, i * 2)))
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE A = 1",
- Seq(1).map(i => Row(i, i * 2)).toSeq)
+ "SELECT a, b FROM oneToTenFiltered WHERE A = 1",
+ Seq(1).map(i => Row(i, i * 2)))
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE b = 2",
- Seq(1).map(i => Row(i, i * 2)).toSeq)
+ "SELECT a, b FROM oneToTenFiltered WHERE b = 2",
+ Seq(1).map(i => Row(i, i * 2)))
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a IS NULL",
+ "SELECT a, b FROM oneToTenFiltered WHERE a IS NULL",
Seq.empty[Row])
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a IS NOT NULL",
+ "SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL",
(1 to 10).map(i => Row(i, i * 2)).toSeq)
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1",
+ "SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1",
(2 to 4).map(i => Row(i, i * 2)).toSeq)
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8",
- Seq(1, 2, 9, 10).map(i => Row(i, i * 2)).toSeq)
+ "SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8",
+ Seq(1, 2, 9, 10).map(i => Row(i, i * 2)))
sqlTest(
- "SELECT * FROM oneToTenFiltered WHERE NOT (a < 6)",
+ "SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)",
(6 to 10).map(i => Row(i, i * 2)).toSeq)
+ sqlTest(
+ "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'",
+ Seq(Row(3, 3 * 2, "c" * 10)))
+
+ sqlTest(
+ "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'd%'",
+ Seq(Row(4, 4 * 2, "d" * 10)))
+
+ sqlTest(
+ "SELECT a, b, c FROM oneToTenFiltered WHERE c like '%e%'",
+ Seq(Row(5, 5 * 2, "e" * 10)))
+
testPushDown("SELECT * FROM oneToTenFiltered WHERE A = 1", 1)
testPushDown("SELECT a FROM oneToTenFiltered WHERE A = 1", 1)
testPushDown("SELECT b FROM oneToTenFiltered WHERE A = 1", 1)