diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2017-02-24 23:05:36 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-02-24 23:05:59 -0800 |
commit | 8f0511ed49a353fb0745f320a84063ced5cc1857 (patch) | |
tree | cb770f0843aa77f2afaa81a98e12a2a1231ce60c | |
parent | 4cb025afafe63d5871356d9dc38d58c1df0da996 (diff) | |
download | spark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.gz spark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.bz2 spark-8f0511ed49a353fb0745f320a84063ced5cc1857.zip |
[SPARK-19650] Commands should not trigger a Spark job
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary.
This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`.
Added a regression test to `SQLQuerySuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #17027 from hvanhovell/no-job-command.
8 files changed, 39 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3c212d656e..1b04623596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.DataStreamWriter @@ -175,19 +175,13 @@ class Dataset[T] private[sql]( } @transient private[sql] val logicalPlan: LogicalPlan = { - def hasSideEffects(plan: LogicalPlan): Boolean = plan match { - case _: Command | - _: InsertIntoTable => true - case _ => false - } - + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. queryExecution.analyzed match { - // For various commands (like DDL) and queries with side effects, we force query execution - // to happen right away to let these side effects take place eagerly. - case p if hasSideEffects(p) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) - case Union(children) if children.forall(hasSideEffects) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) + case c: Command => + LocalRelation(c.output, queryExecution.executedPlan.executeCollect()) + case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => + LocalRelation(u.output, queryExecution.executedPlan.executeCollect()) case _ => queryExecution.analyzed } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 137f7ba04d..6ec2f4d840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => command.executeCollect().map(_.getString(1)) - case command: ExecutedCommandExec => - command.executeCollect().map(_.getString(0)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 027b1481af..20bf4925db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, Strategy} +import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 59eb56920c..ba8bc936f0 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false -- !query 19 schema struct<key:string,value:string> -- !query 19 output -spark.sql.caseSensitive +spark.sql.caseSensitive false -- !query 20 @@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true -- !query 21 schema struct<key:string,value:string> -- !query 21 output -spark.sql.caseSensitive +spark.sql.caseSensitive true -- !query 22 diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out index c64520ff93..c0930bbde6 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out @@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false -- !query 17 schema struct<key:string,value:string> -- !query 17 output -spark.sql.groupByOrdinal +spark.sql.groupByOrdinal false -- !query 18 diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out index 03a4e72d0f..cc47cc67c8 100644 --- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out @@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false -- !query 9 schema struct<key:string,value:string> -- !query 9 output -spark.sql.orderByOrdinal +spark.sql.orderByOrdinal false -- !query 10 diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out index cc50b9444b..5db3bae5d0 100644 --- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out @@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true -- !query 5 schema struct<key:string,value:string> -- !query 5 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled true -- !query 6 @@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false -- !query 7 schema struct<key:string,value:string> -- !query 7 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 40d0ce0992..03cdfccdda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql import java.io.File import java.math.MathContext import java.sql.Timestamp +import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql(badQuery), Row(1) :: Nil) } + test("SPARK-19650: An action on a Command should not trigger a Spark job") { + // Create a listener that checks if new jobs have started. + val jobStarted = new AtomicBoolean(false) + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStarted.set(true) + } + } + + // Make sure no spurious job starts are pending in the listener bus. + sparkContext.listenerBus.waitUntilEmpty(500) + sparkContext.addSparkListener(listener) + try { + // Execute the command. + sql("show databases").head() + + // Make sure we have seen all events triggered by DataFrame.show() + sparkContext.listenerBus.waitUntilEmpty(500) + } finally { + sparkContext.removeSparkListener(listener) + } + assert(!jobStarted.get(), "Command should not trigger a Spark job.") + } } |