diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-03-16 10:52:36 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-16 10:52:36 -0700 |
commit | d9e8f26d0334f393e3b02d7a3b607be54a2a5efe (patch) | |
tree | 9790f502141132cf03c860411031f59cacd366ae /sql/core | |
parent | eacd9d8eda68260bbda7b0cd07410321dffaf428 (diff) | |
download | spark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.tar.gz spark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.tar.bz2 spark-d9e8f26d0334f393e3b02d7a3b607be54a2a5efe.zip |
[SPARK-13924][SQL] officially support multi-insert
## What changes were proposed in this pull request?
There is a feature of hive SQL called multi-insert. For example:
```
FROM src
INSERT OVERWRITE TABLE dest1
SELECT key + 1
INSERT OVERWRITE TABLE dest2
SELECT key WHERE key > 2
INSERT OVERWRITE TABLE dest3
SELECT col EXPLODE(arr) exp AS col
...
```
We partially support it currently, with some limitations: 1) WHERE can't reference columns produced by LATERAL VIEW. 2) It's not executed eagerly, i.e. `sql("...multi-insert clause...")` won't take place right away like other commands, e.g. CREATE TABLE.
This PR removes these limitations and make us fully support multi-insert.
## How was this patch tested?
new tests in `SQLQuerySuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #11754 from cloud-fan/lateral-view.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 969fcdf428..ac2ca3c5a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -134,15 +134,24 @@ class Dataset[T] private[sql]( this(sqlContext, sqlContext.executePlan(logicalPlan), encoder) } - @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match { - // For various commands (like DDL) and queries with side effects, we force query optimization to - // happen right away to let these side effects take place eagerly. - case _: Command | - _: InsertIntoTable | - _: CreateTableUsingAsSelect => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) - case _ => - queryExecution.analyzed + @transient protected[sql] val logicalPlan: LogicalPlan = { + def hasSideEffects(plan: LogicalPlan): Boolean = plan match { + case _: Command | + _: InsertIntoTable | + _: CreateTableUsingAsSelect => true + case _ => false + } + + queryExecution.logical match { + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. + case p if hasSideEffects(p) => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case Union(children) if children.forall(hasSideEffects) => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case _ => + queryExecution.analyzed + } } /** |