diff options
Diffstat (limited to 'sql/core/src/main')
3 files changed, 8 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3c212d656e..1b04623596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.DataStreamWriter @@ -175,19 +175,13 @@ class Dataset[T] private[sql]( } @transient private[sql] val logicalPlan: LogicalPlan = { - def hasSideEffects(plan: LogicalPlan): Boolean = plan match { - case _: Command | - _: InsertIntoTable => true - case _ => false - } - + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. queryExecution.analyzed match { - // For various commands (like DDL) and queries with side effects, we force query execution - // to happen right away to let these side effects take place eagerly. - case p if hasSideEffects(p) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) - case Union(children) if children.forall(hasSideEffects) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) + case c: Command => + LocalRelation(c.output, queryExecution.executedPlan.executeCollect()) + case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => + LocalRelation(u.output, queryExecution.executedPlan.executeCollect()) case _ => queryExecution.analyzed } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 137f7ba04d..6ec2f4d840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => command.executeCollect().map(_.getString(1)) - case command: ExecutedCommandExec => - command.executeCollect().map(_.getString(0)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 027b1481af..20bf4925db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, Strategy} +import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ |