aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-02-24 23:05:36 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-24 23:05:59 -0800
commit8f0511ed49a353fb0745f320a84063ced5cc1857 (patch)
treecb770f0843aa77f2afaa81a98e12a2a1231ce60c
parent4cb025afafe63d5871356d9dc38d58c1df0da996 (diff)
downloadspark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.gz
spark-8f0511ed49a353fb0745f320a84063ced5cc1857.tar.bz2
spark-8f0511ed49a353fb0745f320a84063ced5cc1857.zip
[SPARK-19650] Commands should not trigger a Spark job
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`. Added a regression test to `SQLQuerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17027 from hvanhovell/no-job-command.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/test/resources/sql-tests/results/change-column.sql.out4
-rw-r--r--sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/outer-join.sql.out4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala25
8 files changed, 39 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c212d656e..1b04623596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -175,19 +175,13 @@ class Dataset[T] private[sql](
}
@transient private[sql] val logicalPlan: LogicalPlan = {
- def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
- case _: Command |
- _: InsertIntoTable => true
- case _ => false
- }
-
+ // For various commands (like DDL) and queries with side effects, we force query execution
+ // to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
- // For various commands (like DDL) and queries with side effects, we force query execution
- // to happen right away to let these side effects take place eagerly.
- case p if hasSideEffects(p) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
- case Union(children) if children.forall(hasSideEffects) =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
+ case c: Command =>
+ LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
+ case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
+ LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 137f7ba04d..6ec2f4d840 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
command.executeCollect().map(_.getString(1))
- case command: ExecutedCommandExec =>
- command.executeCollect().map(_.getString(0))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 027b1481af..20bf4925db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, Strategy}
+import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 59eb56920c..ba8bc936f0 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
-- !query 19 schema
struct<key:string,value:string>
-- !query 19 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive false
-- !query 20
@@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
-- !query 21 schema
struct<key:string,value:string>
-- !query 21 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive true
-- !query 22
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
index c64520ff93..c0930bbde6 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
@@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
-- !query 17 schema
struct<key:string,value:string>
-- !query 17 output
-spark.sql.groupByOrdinal
+spark.sql.groupByOrdinal false
-- !query 18
diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
index 03a4e72d0f..cc47cc67c8 100644
--- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
@@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
-- !query 9 schema
struct<key:string,value:string>
-- !query 9 output
-spark.sql.orderByOrdinal
+spark.sql.orderByOrdinal false
-- !query 10
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index cc50b9444b..5db3bae5d0 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
-- !query 5 schema
struct<key:string,value:string>
-- !query 5 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled true
-- !query 6
@@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
-- !query 7 schema
struct<key:string,value:string>
-- !query 7 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 40d0ce0992..03cdfccdda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql
import java.io.File
import java.math.MathContext
import java.sql.Timestamp
+import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql(badQuery), Row(1) :: Nil)
}
+ test("SPARK-19650: An action on a Command should not trigger a Spark job") {
+ // Create a listener that checks if new jobs have started.
+ val jobStarted = new AtomicBoolean(false)
+ val listener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStarted.set(true)
+ }
+ }
+
+ // Make sure no spurious job starts are pending in the listener bus.
+ sparkContext.listenerBus.waitUntilEmpty(500)
+ sparkContext.addSparkListener(listener)
+ try {
+ // Execute the command.
+ sql("show databases").head()
+
+ // Make sure we have seen all events triggered by DataFrame.show()
+ sparkContext.listenerBus.waitUntilEmpty(500)
+ } finally {
+ sparkContext.removeSparkListener(listener)
+ }
+ assert(!jobStarted.get(), "Command should not trigger a Spark job.")
+ }
}