aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala11
5 files changed, 33 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ecb1112955..e5653c5b14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val WHERE = Keyword("WHERE")
+ protected val INTERSECT = Keyword("INTERSECT")
protected val EXCEPT = Keyword("EXCEPT")
@@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+ INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 0728fa73fb..1537de259c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
case object NoRelation extends LeafNode {
override def output = Nil
}
+
+case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+ override def output = left.output
+ override def references = Set.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9e036e127b..7080074a69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
case logical.Except(left,right) =>
execution.Except(planLater(left),planLater(right)) :: Nil
+ case logical.Intersect(left, right) =>
+ execution.Intersect(planLater(left), planLater(right)) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4b59e0b4e5..e8816f0b3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}
}
+/**
+ * :: DeveloperApi ::
+ * Returns the rows in left that also appear in right using the built in spark
+ * intersection function.
+ */
+@DeveloperApi
+case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+ override def output = children.head.output
+
+ override def execute() = {
+ left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5c6701e203..fa1f32f8a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
}
+ test("INTERSECT") {
+ checkAnswer(
+ sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"),
+ (1, "a") ::
+ (2, "b") ::
+ (3, "c") ::
+ (4, "d") :: Nil)
+ checkAnswer(
+ sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil)
+ }
+
test("SET commands semantics using sql()") {
TestSQLContext.settings.synchronized {
clear()