diff options
8 files changed, 39 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3c212d656e..1b04623596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.DataStreamWriter @@ -175,19 +175,13 @@ class Dataset[T] private[sql]( } @transient private[sql] val logicalPlan: LogicalPlan = { - def hasSideEffects(plan: LogicalPlan): Boolean = plan match { - case _: Command | - _: InsertIntoTable => true - case _ => false - } - + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. queryExecution.analyzed match { - // For various commands (like DDL) and queries with side effects, we force query execution - // to happen right away to let these side effects take place eagerly. - case p if hasSideEffects(p) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) - case Union(children) if children.forall(hasSideEffects) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) + case c: Command => + LocalRelation(c.output, queryExecution.executedPlan.executeCollect()) + case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => + LocalRelation(u.output, queryExecution.executedPlan.executeCollect()) case _ => queryExecution.analyzed } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 137f7ba04d..6ec2f4d840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => command.executeCollect().map(_.getString(1)) - case command: ExecutedCommandExec => - command.executeCollect().map(_.getString(0)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 027b1481af..20bf4925db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, Strategy} +import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 59eb56920c..ba8bc936f0 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false -- !query 19 schema struct<key:string,value:string> -- !query 19 output -spark.sql.caseSensitive +spark.sql.caseSensitive false -- !query 20 @@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true -- !query 21 schema struct<key:string,value:string> -- !query 21 output -spark.sql.caseSensitive +spark.sql.caseSensitive true -- !query 22 diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out index c64520ff93..c0930bbde6 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out @@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false -- !query 17 schema struct<key:string,value:string> -- !query 17 output -spark.sql.groupByOrdinal +spark.sql.groupByOrdinal false -- !query 18 diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out index 03a4e72d0f..cc47cc67c8 100644 --- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out @@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false -- !query 9 schema struct<key:string,value:string> -- !query 9 output -spark.sql.orderByOrdinal +spark.sql.orderByOrdinal false -- !query 10 diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out index cc50b9444b..5db3bae5d0 100644 --- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out @@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true -- !query 5 schema struct<key:string,value:string> -- !query 5 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled true -- !query 6 @@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false -- !query 7 schema struct<key:string,value:string> -- !query 7 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 40d0ce0992..03cdfccdda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql import java.io.File import java.math.MathContext import java.sql.Timestamp +import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql(badQuery), Row(1) :: Nil) } + test("SPARK-19650: An action on a Command should not trigger a Spark job") { + // Create a listener that checks if new jobs have started. + val jobStarted = new AtomicBoolean(false) + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStarted.set(true) + } + } + + // Make sure no spurious job starts are pending in the listener bus. + sparkContext.listenerBus.waitUntilEmpty(500) + sparkContext.addSparkListener(listener) + try { + // Execute the command. + sql("show databases").head() + + // Make sure we have seen all events triggered by DataFrame.show() + sparkContext.listenerBus.waitUntilEmpty(500) + } finally { + sparkContext.removeSparkListener(listener) + } + assert(!jobStarted.get(), "Command should not trigger a Spark job.") + } } |