diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2016-06-15 21:33:26 -0700 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-06-15 21:33:26 -0700 |
commit | b75f454f946714b93fe561055cd53b0686187d2e (patch) | |
tree | 33ff1d7a58a1484e15cdd483e27443fd90410f61 | |
parent | 5a52ba0f952b21818ed73cb253381f6a3799dc46 (diff) | |
download | spark-b75f454f946714b93fe561055cd53b0686187d2e.tar.gz spark-b75f454f946714b93fe561055cd53b0686187d2e.tar.bz2 spark-b75f454f946714b93fe561055cd53b0686187d2e.zip |
[SPARK-15824][SQL] Execute WITH .... INSERT ... statements immediately
## What changes were proposed in this pull request?
We currently immediately execute `INSERT` commands when they are issued. This is not the case as soon as we use a `WITH` to define common table expressions, for example:
```sql
WITH
tbl AS (SELECT * FROM x WHERE id = 10)
INSERT INTO y
SELECT *
FROM tbl
```
This PR fixes this problem. This PR closes https://github.com/apache/spark/pull/13561 (which fixes the a instance of this problem in the ThriftSever).
## How was this patch tested?
Added a test to `InsertSuite`
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #13678 from hvanhovell/SPARK-15824.
3 files changed, 27 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fba4066af6..02cc3985b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -179,7 +179,7 @@ class Dataset[T] private[sql]( case _ => false } - queryExecution.logical match { + queryExecution.analyzed match { // For various commands (like DDL) and queries with side effects, we force query execution // to happen right away to let these side effects take place eagerly. case p if hasSideEffects(p) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 4780eb473d..bade41b1ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -259,4 +260,28 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { spark.catalog.dropTempView("oneToTen") } + + test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement immediately") { + withTable("target", "target2") { + sql(s"CREATE TABLE target(a INT, b STRING) USING JSON") + sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT a, b FROM tbl") + checkAnswer( + sql("SELECT a, b FROM target"), + sql("SELECT a, b FROM jt") + ) + + sql(s"CREATE TABLE target2(a INT, b STRING) USING JSON") + val e = sql( + """ + |WITH tbl AS (SELECT * FROM jt) + |FROM tbl + |INSERT INTO target2 SELECT a, b WHERE a <= 5 + |INSERT INTO target2 SELECT a, b WHERE a > 5 + """.stripMargin) + checkAnswer( + sql("SELECT a, b FROM target2"), + sql("SELECT a, b FROM jt") + ) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d24cde2321..224ff3823b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -544,7 +544,7 @@ private[hive] case class InsertIntoHiveTable( child: LogicalPlan, overwrite: Boolean, ifNotExists: Boolean) - extends LogicalPlan { + extends LogicalPlan with Command { override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty |