aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYanjie Gao <gaoyanjie55@163.com>2014-07-07 19:40:04 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-07 19:40:04 -0700
commit50561f4396be7d641feb2a7a54a374d294628231 (patch)
treec0256e1404835df2fcd784098bccdcd3af6479fa /sql
parent4352a2fdaa64efee7158eabef65703460ff284ec (diff)
downloadspark-50561f4396be7d641feb2a7a54a374d294628231.tar.gz
spark-50561f4396be7d641feb2a7a54a374d294628231.tar.bz2
spark-50561f4396be7d641feb2a7a54a374d294628231.zip
[SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator
Hi all, I want to submit a basic operator Intersect For example , in sql case select * from table1 intersect select * from table2 So ,i want use this operator support this function in Spark SQL This operator will return the the intersection of SparkPlan child table RDD . JIRA:https://issues.apache.org/jira/browse/SPARK-2235 Author: Yanjie Gao <gaoyanjie55@163.com> Author: YanjieGao <396154235@qq.com> Closes #1150 from YanjieGao/patch-5 and squashes the following commits: 4629afe [YanjieGao] reformat the code bdc2ac0 [YanjieGao] reformat the code as Michael's suggestion 3b29ad6 [YanjieGao] Merge remote branch 'upstream/master' into patch-5 1cfbfe6 [YanjieGao] refomat some files ea78f33 [YanjieGao] resolve conflict and add annotation on basicOperator and remove HiveQl 0c7cca5 [YanjieGao] modify format problem a802ca8 [YanjieGao] Merge remote branch 'upstream/master' into patch-5 5e374c7 [YanjieGao] resolve conflict in SparkStrategies and basicOperator f7961f6 [Yanjie Gao] update the line less than bdc4a05 [Yanjie Gao] Update basicOperators.scala 0b49837 [Yanjie Gao] delete the annotation f1288b4 [Yanjie Gao] delete annotation e2b64be [Yanjie Gao] Update basicOperators.scala 4dd453e [Yanjie Gao] Update SQLQuerySuite.scala 790765d [Yanjie Gao] Update SparkStrategies.scala ac73e60 [Yanjie Gao] Update basicOperators.scala d4ac5e5 [Yanjie Gao] Update HiveQl.scala 61e88e7 [Yanjie Gao] Update SqlParser.scala 469f099 [Yanjie Gao] Update basicOperators.scala e5bff61 [Yanjie Gao] Spark SQL basicOperator add Intersect operator
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala11
5 files changed, 33 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ecb1112955..e5653c5b14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val WHERE = Keyword("WHERE")
+ protected val INTERSECT = Keyword("INTERSECT")
protected val EXCEPT = Keyword("EXCEPT")
@@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+ INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 0728fa73fb..1537de259c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
case object NoRelation extends LeafNode {
override def output = Nil
}
+
+case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+ override def output = left.output
+ override def references = Set.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9e036e127b..7080074a69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
case logical.Except(left,right) =>
execution.Except(planLater(left),planLater(right)) :: Nil
+ case logical.Intersect(left, right) =>
+ execution.Intersect(planLater(left), planLater(right)) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4b59e0b4e5..e8816f0b3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}
}
+/**
+ * :: DeveloperApi ::
+ * Returns the rows in left that also appear in right using the built in spark
+ * intersection function.
+ */
+@DeveloperApi
+case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+ override def output = children.head.output
+
+ override def execute() = {
+ left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5c6701e203..fa1f32f8a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
}
+ test("INTERSECT") {
+ checkAnswer(
+ sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"),
+ (1, "a") ::
+ (2, "b") ::
+ (3, "c") ::
+ (4, "d") :: Nil)
+ checkAnswer(
+ sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil)
+ }
+
test("SET commands semantics using sql()") {
TestSQLContext.settings.synchronized {
clear()