aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-02-24 23:05:36 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-24 23:05:59 -0800
commit8f0511ed49a353fb0745f320a84063ced5cc1857 (patch)
treecb770f0843aa77f2afaa81a98e12a2a1231ce60c /sql/core/src/main
parent4cb025afafe63d5871356d9dc38d58c1df0da996 (diff)
downloadspark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.gz
spark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.bz2
spark-8f0511ed49a353fb0745f320a84063ced5cc1857.zip
[SPARK-19650] Commands should not trigger a Spark job
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`. Added a regression test to `SQLQuerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17027 from hvanhovell/no-job-command.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
3 files changed, 8 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c212d656e..1b04623596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -175,19 +175,13 @@ class Dataset[T] private[sql](
}
@transient private[sql] val logicalPlan: LogicalPlan = {
- def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
- case _: Command |
- _: InsertIntoTable => true
- case _ => false
- }
-
+ // For various commands (like DDL) and queries with side effects, we force query execution
+ // to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
- // For various commands (like DDL) and queries with side effects, we force query execution
- // to happen right away to let these side effects take place eagerly.
- case p if hasSideEffects(p) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
- case Union(children) if children.forall(hasSideEffects) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
+ case c: Command =>
+ LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
+ case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
+ LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 137f7ba04d..6ec2f4d840 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
command.executeCollect().map(_.getString(1))
- case command: ExecutedCommandExec =>
- command.executeCollect().map(_.getString(0))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 027b1481af..20bf4925db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, Strategy}
+import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._