aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-16 10:52:36 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 10:52:36 -0700
commitd9e8f26d0334f393e3b02d7a3b607be54a2a5efe (patch)
tree9790f502141132cf03c860411031f59cacd366ae /sql/core/src
parenteacd9d8eda68260bbda7b0cd07410321dffaf428 (diff)
downloadspark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.tar.gz
spark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.tar.bz2
spark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.zip
[SPARK-13924][SQL] officially support multi-insert
## What changes were proposed in this pull request? There is a feature of hive SQL called multi-insert. For example: ``` FROM src INSERT OVERWRITE TABLE dest1 SELECT key + 1 INSERT OVERWRITE TABLE dest2 SELECT key WHERE key > 2 INSERT OVERWRITE TABLE dest3 SELECT col EXPLODE(arr) exp AS col ... ``` We partially support it currently, with some limitations: 1) WHERE can't reference columns produced by LATERAL VIEW. 2) It's not executed eagerly, i.e. `sql("...multi-insert clause...")` won't take place right away like other commands, e.g. CREATE TABLE. This PR removes these limitations and make us fully support multi-insert. ## How was this patch tested? new tests in `SQLQuerySuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #11754 from cloud-fan/lateral-view.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala27
1 files changed, 18 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 969fcdf428..ac2ca3c5a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -134,15 +134,24 @@ class Dataset[T] private[sql](
this(sqlContext, sqlContext.executePlan(logicalPlan), encoder)
}
- @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
- // For various commands (like DDL) and queries with side effects, we force query optimization to
- // happen right away to let these side effects take place eagerly.
- case _: Command |
- _: InsertIntoTable |
- _: CreateTableUsingAsSelect =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
- case _ =>
- queryExecution.analyzed
+ @transient protected[sql] val logicalPlan: LogicalPlan = {
+ def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
+ case _: Command |
+ _: InsertIntoTable |
+ _: CreateTableUsingAsSelect => true
+ case _ => false
+ }
+
+ queryExecution.logical match {
+ // For various commands (like DDL) and queries with side effects, we force query execution
+ // to happen right away to let these side effects take place eagerly.
+ case p if hasSideEffects(p) =>
+ LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+ case Union(children) if children.forall(hasSideEffects) =>
+ LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+ case _ =>
+ queryExecution.analyzed
+ }
}
/**