aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/test/resources/sql-tests/results/change-column.sql.out4
-rw-r--r--sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/outer-join.sql.out4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala25
8 files changed, 39 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c212d656e..1b04623596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -175,19 +175,13 @@ class Dataset[T] private[sql](
}
@transient private[sql] val logicalPlan: LogicalPlan = {
- def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
- case _: Command |
- _: InsertIntoTable => true
- case _ => false
- }
-
+ // For various commands (like DDL) and queries with side effects, we force query execution
+ // to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
- // For various commands (like DDL) and queries with side effects, we force query execution
- // to happen right away to let these side effects take place eagerly.
- case p if hasSideEffects(p) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
- case Union(children) if children.forall(hasSideEffects) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
+ case c: Command =>
+ LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
+ case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
+ LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 137f7ba04d..6ec2f4d840 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
command.executeCollect().map(_.getString(1))
- case command: ExecutedCommandExec =>
- command.executeCollect().map(_.getString(0))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 027b1481af..20bf4925db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, Strategy}
+import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 59eb56920c..ba8bc936f0 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
-- !query 19 schema
struct<key:string,value:string>
-- !query 19 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive false
-- !query 20
@@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
-- !query 21 schema
struct<key:string,value:string>
-- !query 21 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive true
-- !query 22
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
index c64520ff93..c0930bbde6 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
@@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
-- !query 17 schema
struct<key:string,value:string>
-- !query 17 output
-spark.sql.groupByOrdinal
+spark.sql.groupByOrdinal false
-- !query 18
diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
index 03a4e72d0f..cc47cc67c8 100644
--- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
@@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
-- !query 9 schema
struct<key:string,value:string>
-- !query 9 output
-spark.sql.orderByOrdinal
+spark.sql.orderByOrdinal false
-- !query 10
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index cc50b9444b..5db3bae5d0 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
-- !query 5 schema
struct<key:string,value:string>
-- !query 5 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled true
-- !query 6
@@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
-- !query 7 schema
struct<key:string,value:string>
-- !query 7 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 40d0ce0992..03cdfccdda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql
import java.io.File
import java.math.MathContext
import java.sql.Timestamp
+import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql(badQuery), Row(1) :: Nil)
}
+ test("SPARK-19650: An action on a Command should not trigger a Spark job") {
+ // Create a listener that checks if new jobs have started.
+ val jobStarted = new AtomicBoolean(false)
+ val listener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStarted.set(true)
+ }
+ }
+
+ // Make sure no spurious job starts are pending in the listener bus.
+ sparkContext.listenerBus.waitUntilEmpty(500)
+ sparkContext.addSparkListener(listener)
+ try {
+ // Execute the command.
+ sql("show databases").head()
+
+ // Make sure we have seen all events triggered by DataFrame.show()
+ sparkContext.listenerBus.waitUntilEmpty(500)
+ } finally {
+ sparkContext.removeSparkListener(listener)
+ }
+ assert(!jobStarted.get(), "Command should not trigger a Spark job.")
+ }
}