aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-08 17:15:10 -0700
committerAndrew Or <andrew@databricks.com>2015-05-08 17:15:10 -0700
commitbd61f07039064833108070e19b752d4c46045766 (patch)
tree439e3f0cca8250101fa4ef74a2e719f6ecd1db19 /sql
parentffdc40ce7a799f2564f57b958d0f32f1d1636488 (diff)
downloadspark-bd61f07039064833108070e19b752d4c46045766.tar.gz
spark-bd61f07039064833108070e19b752d4c46045766.tar.bz2
spark-bd61f07039064833108070e19b752d4c46045766.zip
[SPARK-7469] [SQL] DAG visualization: show SQL query operators
The DAG visualization currently displays only low-level Spark primitives (e.g. `map`, `reduceByKey`, `filter` etc.). For SQL, these aren't particularly useful. Instead, we should display higher level physical operators (e.g. `Filter`, `Exchange`, `ShuffleHashJoin`). cc marmbrus ----------------- **Before** <img src="https://issues.apache.org/jira/secure/attachment/12731586/before.png" width="600px"/> ----------------- **After** (Pay attention to the words) <img src="https://issues.apache.org/jira/secure/attachment/12731587/after.png" width="600px"/> ----------------- Author: Andrew Or <andrew@databricks.com> Closes #5999 from andrewor14/dag-viz-sql and squashes the following commits: 0db23a4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql 1e211db [Andrew Or] Update comment 0d49fd6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql ffd237a [Andrew Or] Fix style 202dac1 [Andrew Or] Make ignoreParent false by default e61b1ab [Andrew Or] Visualize SQL operators, not low-level Spark primitives 569034a [Andrew Or] Add a flag to ignore parent settings and scopes
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
27 files changed, 57 insertions, 44 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index d9b6fb43ab..0ded1cce68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 18b1ba4c5c..8d16749697 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -121,7 +121,7 @@ case class Aggregate(
}
}
- override def execute(): RDD[Row] = attachTree(this, "execute") {
+ protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f0d54cd6cd..f02fa81e95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -109,7 +109,7 @@ case class Exchange(
serializer
}
- override def execute(): RDD[Row] = attachTree(this , "execute") {
+ protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 57effbf7ec..a500269f3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
/** Physical plan node for scanning data from an RDD. */
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
- override def execute(): RDD[Row] = rdd
+ protected override def doExecute(): RDD[Row] = rdd
}
/** Logical plan node for scanning data from a local collection. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 575849481f..f16ca36909 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -43,7 +43,7 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
- override def execute(): RDD[Row] = attachTree(this, "execute") {
+ protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
// TODO Move out projection objects creation and transfer to
// workers via closure. However we can't assume the Projection
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 5201e20a10..08d9079335 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -46,7 +46,7 @@ case class Generate(
val boundGenerator = BindReferences.bindReference(generator, child.output)
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
if (join) {
child.execute().mapPartitions { iter =>
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 5d9f202681..2ec7d4fbc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
a.collect { case agg: AggregateExpression => agg}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index ace9af5f38..03bee80ad7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
- override def execute(): RDD[Row] = rdd
+ protected override def doExecute(): RDD[Row] = rdd
override def executeCollect(): Array[Row] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 59c89800da..435ac01117 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.catalyst.expressions._
@@ -79,14 +79,25 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
/**
- * Runs this query returning the result as an RDD.
+ * Returns the result of this query as an RDD[Row] by delegating to doExecute
+ * after adding query plan information to created RDDs for visualization.
+ * Concrete implementations of SparkPlan should override doExecute instead.
*/
- def execute(): RDD[Row]
+ final def execute(): RDD[Row] = {
+ RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
+ doExecute()
+ }
+ }
/**
- * Runs this query returning the result as an array.
+ * Overridden by concrete implementations of SparkPlan.
+ * Produces the result of the query as an RDD[Row]
*/
+ protected def doExecute(): RDD[Row]
+ /**
+ * Runs this query returning the result as an array.
+ */
def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 217b559def..c4327ce262 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -112,7 +112,7 @@ case class Window(
}
}
- def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 5ca11e67a9..6cb67b4bbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
- override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
+ protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
val resuableProjection = buildProjection()
iter.map(resuableProjection)
}
@@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
@transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
- override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
+ protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
iter.filter(conditionEvaluator)
}
@@ -83,7 +83,7 @@ case class Sample(
override def output: Seq[Attribute] = child.output
// TODO: How to pick seed?
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
if (withReplacement) {
child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
} else {
@@ -99,7 +99,7 @@ case class Sample(
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
- override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
+ protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
}
/**
@@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan)
override def executeCollect(): Array[Row] = child.executeTake(limit)
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
iter.take(limit).map(row => (false, row.copy()))
@@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
- override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
+ protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
override def outputOrdering: Seq[SortOrder] = sortOrder
}
@@ -186,7 +186,7 @@ case class Sort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
- override def execute(): RDD[Row] = attachTree(this, "sort") {
+ protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
@@ -214,7 +214,7 @@ case class ExternalSort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
- override def execute(): RDD[Row] = attachTree(this, "sort") {
+ protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
@@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def requiredChildDistribution: Seq[Distribution] =
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
val hashSet = new scala.collection.mutable.HashSet[Row]()
@@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}
@@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
}
@@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
@@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil
- def execute(): RDD[Row] = child.execute()
+ protected override def doExecute(): RDD[Row] = child.execute()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 388a8184e4..49b361e96b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val converted = sideEffectResult.map(r =>
CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
sqlContext.sparkContext.parallelize(converted, 1)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 710787096e..dffb265601 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -125,7 +125,7 @@ package object debug {
}
}
- def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {
def hasNext: Boolean = iter.hasNext
@@ -193,7 +193,7 @@ package object debug {
def children: List[SparkPlan] = child :: Nil
- def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().map { row =>
try typeCheck(row, child.schema) catch {
case e: Exception =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 926f5e6c13..05dd5681ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -66,7 +66,7 @@ case class BroadcastHashJoin(
sparkContext.broadcast(hashed)
}
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 3ef1e0d7fb..640fc26ba3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash(
override def output: Seq[Attribute] = left.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 6aaf35fb42..caad3dfbe1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 1cbc98354d..191c00cb55 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index a396c0f5d5..4557439299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -183,7 +183,7 @@ case class HashOuterJoin(
hashTable
}
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index b03af410dc..036423e6fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index a04f2a63b5..8ad27eae80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -42,7 +42,7 @@ case class LeftSemiJoinHash(
override def output: Seq[Attribute] = left.output
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index a6cd8337c1..219525d9d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -43,7 +43,7 @@ case class ShuffledHashJoin(
override def requiredChildDistribution: Seq[ClusteredDistribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
hashJoin(streamIter, hashed)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index b5123668ba..1a39fb4b96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -60,7 +60,7 @@ case class SortMergeJoin(
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
keys.map(SortOrder(_, Ascending))
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 58cb1980f2..3dbc383795 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
def children: Seq[SparkPlan] = child :: Nil
- def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
val childResults = child.execute().map(_.copy())
val parent = childResults.mapPartitions { iter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index aded126ea0..75ac52d4a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan(
}
}.toArray
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
val sc = sqlContext.sparkContext
@@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable(
/**
* Inserts all rows into the Parquet file.
*/
- override def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
// TODO: currently we do not check whether the "schema"s are compatible
// That means if one first creates a table and then INSERTs data with
// and incompatible schema the execution will fail. It would be nice
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 0a5f19eee7..62dc4167b7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -129,7 +129,7 @@ case class HiveTableScan(
}
}
- override def execute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
+ protected override def doExecute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index de8954d5de..c0b0b104e9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -258,5 +258,7 @@ case class InsertIntoHiveTable(
override def executeCollect(): Array[Row] = sideEffectResult.toArray
- override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+ protected override def doExecute(): RDD[Row] = {
+ sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3eddda3b28..bfd26e0170 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -54,7 +54,7 @@ case class ScriptTransformation(
override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
- def execute(): RDD[Row] = {
+ protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)