diff options
author | Yanjie Gao <gaoyanjie55@163.com> | 2014-07-04 02:43:57 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-04 02:43:57 -0700 |
commit | 5dadda86456e1d3918e320b83aec7e2f1352d95d (patch) | |
tree | 7c8abffc8cb5e6ec059442f89565d31e423062ef /sql | |
parent | b3e768e154bd7175db44c3ffc3d8f783f15ab776 (diff) | |
download | spark-5dadda86456e1d3918e320b83aec7e2f1352d95d.tar.gz spark-5dadda86456e1d3918e320b83aec7e2f1352d95d.tar.bz2 spark-5dadda86456e1d3918e320b83aec7e2f1352d95d.zip |
[SPARK-2234][SQL]Spark SQL basicOperators add Except operator
Hi all,
I want to submit a Except operator in basicOperators.scala
In SQL case.SQL support two table do except operator.
select * from table1
except
select * from table2
This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support
JIRA:https://issues.apache.org/jira/browse/SPARK-2234
Author: Yanjie Gao <gaoyanjie55@163.com>
Author: YanjieGao <396154235@qq.com>
Author: root <root@node4.(none)>
Author: gaoyanjie <gaoyanjie55@163.com>
Closes #1151 from YanjieGao/patch-6 and squashes the following commits:
f19f899 [YanjieGao] add a new blank line in basicoperators.scala
2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies
fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6
9940d19 [YanjieGao] make comment less than 100c
09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment
b4b5867 [root] Merge remote branch 'upstream/master' into patch-6
b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext
7e0ec29 [Yanjie Gao] delete multi test
7e7c83f [Yanjie Gao] delete conflict except
b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators
4dc8166 [YanjieGao] resolve conflict
fa68a98 [Yanjie Gao] Update joins.scala
8e6bb00 [Yanjie Gao] delete conflict except
dd9ba5e [Yanjie Gao] Update joins.scala
a0d4e73 [Yanjie Gao] delete skew join
60f5ddd [Yanjie Gao] update less than 100c
0e72233 [Yanjie Gao] update SQLQuerySuite on master branch
7f916b5 [Yanjie Gao] update execution/basicOperators on master branch
a28dece [Yanjie Gao] Update logical/basicOperators on master branch
a639935 [Yanjie Gao] Update SparkStrategies.scala
3bf7def [Yanjie Gao] update SqlParser on master branch
26f833f [Yanjie Gao] update SparkStrategies.scala on master branch
8dd063f [Yanjie Gao] Update logical/basicOperators on master branch
9847dcf [Yanjie Gao] update SqlParser on masterbranch
d6a4604 [Yanjie Gao] Update joins.scala
424c507 [Yanjie Gao] Update joins.scala
7680742 [Yanjie Gao] Update SqlParser.scala
a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151
5c8a224 [Yanjie Gao] update the line less than 100c
ee066b3 [Yanjie Gao] Update basicOperators.scala
32a80ab [Yanjie Gao] remove except in HiveQl
cf232eb [Yanjie Gao] update 1comment 2space3 left.out
f1ea3f3 [Yanjie Gao] remove comment
7ea9b91 [Yanjie Gao] remove annotation
7f3d613 [Yanjie Gao] update .map(_.copy())
670a1bb [Yanjie Gao] Update HiveQl.scala
3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala
a36eb0a [Yanjie Gao] Update basicOperators.scala
7859e56 [Yanjie Gao] Update SparkStrategies.scala
052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2)
aab3785 [Yanjie Gao] Update SQLQuerySuite.scala
4bf80b1 [Yanjie Gao] update subtract to except
4bdd520 [Yanjie Gao] Update SqlParser.scala
2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala
0808921 [Yanjie Gao] SQLQuerySuite
a8a1948 [Yanjie Gao] SparkStrategies
1fe96c0 [Yanjie Gao] HiveQl.scala update
3305e40 [Yanjie Gao] SqlParser
7a98c37 [Yanjie Gao] Update basicOperators.scala
cf5b9d0 [Yanjie Gao] Update basicOperators.scala
8945835 [Yanjie Gao] object SkewJoin extends Strategy
2b98962 [Yanjie Gao] Update SqlParser.scala
dd32980 [Yanjie Gao] update1
68815b2 [Yanjie Gao] Reformat the code style
4eb43ec [Yanjie Gao] Update basicOperators.scala
aa06072 [Yanjie Gao] Reformat the code sytle
Diffstat (limited to 'sql')
5 files changed, 40 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 61762fa2a7..ecb1112955 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -118,6 +118,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val EXCEPT = Keyword("EXCEPT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -138,6 +140,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e0639867b..bac5a72464 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -89,6 +89,12 @@ case class Join( } } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0925605b7c..9e036e127b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -273,6 +273,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil + case logical.Except(left,right) => + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a278f1ca98..4b59e0b4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -205,3 +205,18 @@ object ExistingRdd { case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } + +/** + * :: DeveloperApi :: + * Returns a table with the elements from left that are not in right using + * the built-in spark subtract function. + */ +@DeveloperApi +case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = left.output + + override def execute() = { + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + } +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2c1cb18670..5c6701e203 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -371,6 +371,20 @@ class SQLQuerySuite extends QueryTest { (3, null))) } + test("EXCEPT") { + + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear() |