diff options
author | Yanjie Gao <gaoyanjie55@163.com> | 2014-07-07 19:40:04 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-07 19:40:04 -0700 |
commit | 50561f4396be7d641feb2a7a54a374d294628231 (patch) | |
tree | c0256e1404835df2fcd784098bccdcd3af6479fa | |
parent | 4352a2fdaa64efee7158eabef65703460ff284ec (diff) | |
download | spark-50561f4396be7d641feb2a7a54a374d294628231.tar.gz spark-50561f4396be7d641feb2a7a54a374d294628231.tar.bz2 spark-50561f4396be7d641feb2a7a54a374d294628231.zip |
[SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator
Hi all,
I want to submit a basic operator Intersect
For example , in sql case
select * from table1
intersect
select * from table2
So ,i want use this operator support this function in Spark SQL
This operator will return the the intersection of SparkPlan child table RDD .
JIRA:https://issues.apache.org/jira/browse/SPARK-2235
Author: Yanjie Gao <gaoyanjie55@163.com>
Author: YanjieGao <396154235@qq.com>
Closes #1150 from YanjieGao/patch-5 and squashes the following commits:
4629afe [YanjieGao] reformat the code
bdc2ac0 [YanjieGao] reformat the code as Michael's suggestion
3b29ad6 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
1cfbfe6 [YanjieGao] refomat some files
ea78f33 [YanjieGao] resolve conflict and add annotation on basicOperator and remove HiveQl
0c7cca5 [YanjieGao] modify format problem
a802ca8 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
5e374c7 [YanjieGao] resolve conflict in SparkStrategies and basicOperator
f7961f6 [Yanjie Gao] update the line less than
bdc4a05 [Yanjie Gao] Update basicOperators.scala
0b49837 [Yanjie Gao] delete the annotation
f1288b4 [Yanjie Gao] delete annotation
e2b64be [Yanjie Gao] Update basicOperators.scala
4dd453e [Yanjie Gao] Update SQLQuerySuite.scala
790765d [Yanjie Gao] Update SparkStrategies.scala
ac73e60 [Yanjie Gao] Update basicOperators.scala
d4ac5e5 [Yanjie Gao] Update HiveQl.scala
61e88e7 [Yanjie Gao] Update SqlParser.scala
469f099 [Yanjie Gao] Update basicOperators.scala
e5bff61 [Yanjie Gao] Spark SQL basicOperator add Intersect operator
5 files changed, 33 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index ecb1112955..e5653c5b14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val INTERSECT = Keyword("INTERSECT") protected val EXCEPT = Keyword("EXCEPT") @@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 0728fa73fb..1537de259c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { case object NoRelation extends LeafNode { override def output = Nil } + +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + override def output = left.output + override def references = Set.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9e036e127b..7080074a69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => execution.Except(planLater(left),planLater(right)) :: Nil + case logical.Intersect(left, right) => + execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4b59e0b4e5..e8816f0b3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { } } +/** + * :: DeveloperApi :: + * Returns the rows in left that also appear in right using the built in spark + * intersection function. + */ +@DeveloperApi +case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = children.head.output + + override def execute() = { + left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5c6701e203..fa1f32f8a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest { sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) } + test("INTERSECT") { + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil) + } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear() |