aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-22 17:43:56 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 17:43:56 -0700
commitd7d0cad0ad7667c0e09ae01601ee0e4d0b09963c (patch)
tree10ab6d65e25c6451a1cdde83321c989a6e901856
parentc431a76d0628985bb445189b9a2913dd41b86f7b (diff)
downloadspark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.tar.gz
spark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.tar.bz2
spark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.zip
[SPARK-14855][SQL] Add "Exec" suffix to physical operators
## What changes were proposed in this pull request? This patch adds "Exec" suffix to all physical operators. Before this patch, Spark's physical operators and logical operators are named the same (e.g. Project could be logical.Project or execution.Project), which caused small issues in code review and bigger issues in code refactoring. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #12617 from rxin/exec-node.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala110
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala)22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala50
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala)12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala)8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala115
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala4
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala)4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala4
77 files changed, 473 insertions, 436 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 232ca43588..3d0e016a09 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -408,8 +408,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
}
- /** Returns the name of this type of TreeNode. Defaults to the class name. */
- def nodeName: String = getClass.getSimpleName
+ /**
+ * Returns the name of this type of TreeNode. Defaults to the class name.
+ * Note that we remove the "Exec" suffix for physical operators here.
+ */
+ def nodeName: String = getClass.getSimpleName.replaceAll("Exec$", "")
/**
* The arguments that should be included in the arg string. Defaults to the `productIterator`.
@@ -426,7 +429,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case other => other :: Nil
}.mkString(", ")
- /** String representation of this node without any children */
+ /** String representation of this node without any children. */
def simpleString: String = s"$nodeName $argString".trim
override def toString: String = treeString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 12d03a7df8..b3a197cd96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -100,10 +100,10 @@ private[sql] case class LogicalRDD(
}
/** Physical plan node for scanning data from an RDD. */
-private[sql] case class PhysicalRDD(
+private[sql] case class RDDScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- override val nodeName: String) extends LeafNode {
+ override val nodeName: String) extends LeafExecNode {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -124,7 +124,7 @@ private[sql] case class PhysicalRDD(
}
}
-private[sql] trait DataSourceScan extends LeafNode {
+private[sql] trait DataSourceScanExec extends LeafExecNode {
val rdd: RDD[InternalRow]
val relation: BaseRelation
@@ -132,19 +132,19 @@ private[sql] trait DataSourceScan extends LeafNode {
// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
- case other: DataSourceScan => relation == other.relation && metadata == other.metadata
+ case other: DataSourceScanExec => relation == other.relation && metadata == other.metadata
case _ => false
}
}
/** Physical plan node for scanning data from a relation. */
-private[sql] case class RowDataSourceScan(
+private[sql] case class RowDataSourceScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String] = Map.empty)
- extends DataSourceScan with CodegenSupport {
+ extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -207,13 +207,13 @@ private[sql] case class RowDataSourceScan(
}
/** Physical plan node for scanning data from a batched relation. */
-private[sql] case class BatchedDataSourceScan(
+private[sql] case class BatchedDataSourceScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String] = Map.empty)
- extends DataSourceScan with CodegenSupport {
+ extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
@@ -316,7 +316,7 @@ private[sql] case class BatchedDataSourceScan(
}
}
-private[sql] object DataSourceScan {
+private[sql] object DataSourceScanExec {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
@@ -325,7 +325,7 @@ private[sql] object DataSourceScan {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
- metadata: Map[String, String] = Map.empty): DataSourceScan = {
+ metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
@@ -349,9 +349,9 @@ private[sql] object DataSourceScan {
relation match {
case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) =>
- BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
case _ =>
- RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index 3966af542e..7c4756663a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -33,11 +33,11 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
* @param output The output Schema
* @param child Child operator
*/
-case class Expand(
+case class ExpandExec(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
- extends UnaryNode with CodegenSupport {
+ extends UnaryExecNode with CodegenSupport {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 9938d2169f..10cfec3330 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -47,13 +47,13 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
* @param output the output attributes of this node, which constructed in analysis phase,
* and we can not change it, as the parent node bound with it already.
*/
-case class Generate(
+case class GenerateExec(
generator: Generator,
join: Boolean,
outer: Boolean,
output: Seq[Attribute],
child: SparkPlan)
- extends UnaryNode {
+ extends UnaryExecNode {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index f8aec9e7a1..4ab447a47b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* Physical plan node for scanning data from a local collection.
*/
-private[sql] case class LocalTableScan(
+private[sql] case class LocalTableScanExec(
output: Seq[Attribute],
- rows: Seq[InternalRow]) extends LeafNode {
+ rows: Seq[InternalRow]) extends LeafExecNode {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a444a70302..bb83676b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, HiveNativeCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
@@ -107,7 +107,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
* execution is simply passed back to Hive.
*/
def hiveResultString(): Seq[String] = executedPlan match {
- case ExecutedCommand(desc: DescribeTableCommand) =>
+ case ExecutedCommandExec(desc: DescribeTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sqlContext).map {
@@ -117,7 +117,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
- case command: ExecutedCommand =>
+ case command: ExecutedCommandExec =>
command.executeCollect().map(_.getString(0))
case other =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 04a39a126e..0e4d6d72c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -36,12 +36,12 @@ import org.apache.spark.util.collection.unsafe.sort.RadixSort;
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
-case class Sort(
+case class SortExec(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
- extends UnaryNode with CodegenSupport {
+ extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 64d89f238c..e28e456662 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -41,6 +41,8 @@ import org.apache.spark.util.ThreadUtils
/**
* The base class for physical operators.
+ *
+ * The naming convention is that physical operators end with "Exec" suffix, e.g. [[ProjectExec]].
*/
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
@@ -392,19 +394,19 @@ object SparkPlan {
ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
}
-private[sql] trait LeafNode extends SparkPlan {
+private[sql] trait LeafExecNode extends SparkPlan {
override def children: Seq[SparkPlan] = Nil
override def producedAttributes: AttributeSet = outputSet
}
-object UnaryNode {
+object UnaryExecNode {
def unapply(a: Any): Option[(SparkPlan, SparkPlan)] = a match {
case s: SparkPlan if s.children.size == 1 => Some((s, s.children.head))
case _ => None
}
}
-private[sql] trait UnaryNode extends SparkPlan {
+private[sql] trait UnaryExecNode extends SparkPlan {
def child: SparkPlan
override def children: Seq[SparkPlan] = child :: Nil
@@ -412,7 +414,7 @@ private[sql] trait UnaryNode extends SparkPlan {
override def outputPartitioning: Partitioning = child.outputPartitioning
}
-private[sql] trait BinaryNode extends SparkPlan {
+private[sql] trait BinaryExecNode extends SparkPlan {
def left: SparkPlan
def right: SparkPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 247f55da1d..cb4b1cfeb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.exchange.ReusedExchange
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils
@@ -51,7 +51,7 @@ private[sql] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
- case ReusedExchange(_, child) => child :: Nil
+ case ReusedExchangeExec(_, child) => child :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 8d05ae470d..0afa4c7bb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -82,10 +82,10 @@ class SparkPlanner(
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
- filterCondition.map(Filter(_, scan)).getOrElse(scan)
+ filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
- Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+ ProjectExec(projectList, filterCondition.map(FilterExec(_, scan)).getOrElse(scan))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ed6b846fcf..3ce5f28bf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
-import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
+import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -44,20 +44,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
- execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
- execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(
+ limit, order, Some(projectList), planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
- execution.CollectLimit(limit, planLater(child)) :: Nil
+ execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
- execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
- execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(
+ limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}
@@ -66,12 +68,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(
LeftExistence(jt), leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
- Seq(joins.BroadcastHashJoin(
+ Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right)))
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(
LeftExistence(jt), leftKeys, rightKeys, condition, left, right) =>
- Seq(joins.ShuffledHashJoin(
+ Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right)))
case _ => Nil
}
@@ -146,11 +148,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// --- Inner joins --------------------------------------------------------------------------
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
- Seq(joins.BroadcastHashJoin(
+ Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
- Seq(joins.BroadcastHashJoin(
+ Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
@@ -162,41 +164,41 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
} else {
BuildLeft
}
- Seq(joins.ShuffledHashJoin(
+ Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, Inner, buildSide, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
- joins.SortMergeJoin(
+ joins.SortMergeJoinExec(
leftKeys, rightKeys, Inner, condition, planLater(left), planLater(right)) :: Nil
// --- Outer joins --------------------------------------------------------------------------
case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
- Seq(joins.BroadcastHashJoin(
+ Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
- Seq(joins.BroadcastHashJoin(
+ Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildHashMap(right) && muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
- Seq(joins.ShuffledHashJoin(
+ Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildHashMap(left) && muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
- Seq(joins.ShuffledHashJoin(
+ Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
- joins.SortMergeJoin(
+ joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
@@ -278,10 +280,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object BroadcastNestedLoop extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
- execution.joins.BroadcastNestedLoopJoin(
+ execution.joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
- execution.joins.BroadcastNestedLoopJoin(
+ execution.joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
case _ => Nil
}
@@ -290,10 +292,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object CartesianProduct extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, Inner, None) =>
- execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
+ execution.joins.CartesianProductExec(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
- execution.Filter(condition,
- execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
+ execution.FilterExec(condition,
+ execution.joins.CartesianProductExec(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
}
@@ -308,7 +310,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
joins.BuildLeft
}
// This join could be very slow or even hang forever
- joins.BroadcastNestedLoopJoin(
+ joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case _ => Nil
}
@@ -323,7 +325,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
identity[Seq[Expression]], // All filters still need to be evaluated.
- InMemoryColumnarTableScan(_, filters, mem)) :: Nil
+ InMemoryTableScanExec(_, filters, mem)) :: Nil
case _ => Nil
}
}
@@ -333,11 +335,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def numPartitions: Int = self.numPartitions
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: RunnableCommand => ExecutedCommand(r) :: Nil
+ case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
- LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
+ LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case logical.Distinct(child) =>
throw new IllegalStateException(
@@ -349,19 +351,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObject(deserializer, objAttr, planLater(child)) :: Nil
case logical.SerializeFromObject(serializer, child) =>
- execution.SerializeFromObject(serializer, planLater(child)) :: Nil
+ execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
case logical.MapPartitions(f, objAttr, child) =>
- execution.MapPartitions(f, objAttr, planLater(child)) :: Nil
+ execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
case logical.MapElements(f, objAttr, child) =>
- execution.MapElements(f, objAttr, planLater(child)) :: Nil
+ execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, in, out, child) =>
- execution.AppendColumns(f, in, out, planLater(child)) :: Nil
+ execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
- execution.AppendColumnsWithObject(f, childSer, newSer, planLater(child)) :: Nil
+ execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil
case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
- execution.MapGroups(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil
+ execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil
case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) =>
- execution.CoGroup(
+ execution.CoGroupExec(
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
planLater(left), planLater(right)) :: Nil
@@ -369,45 +371,45 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
- execution.Coalesce(numPartitions, planLater(child)) :: Nil
+ execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
- execution.Sort(sortExprs, global = false, child = planLater(child)) :: Nil
+ execution.SortExec(sortExprs, global = false, child = planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) =>
- execution.Sort(sortExprs, global, planLater(child)) :: Nil
+ execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
- execution.Project(projectList, planLater(child)) :: Nil
+ execution.ProjectExec(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
- execution.Filter(condition, planLater(child)) :: Nil
+ execution.FilterExec(condition, planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
- execution.Expand(e.projections, e.output, planLater(child)) :: Nil
+ execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
- execution.Window(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
+ execution.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
- execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
+ execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
- LocalTableScan(output, data) :: Nil
+ LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
- execution.LocalLimit(limit, planLater(child)) :: Nil
+ execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
- execution.GlobalLimit(limit, planLater(child)) :: Nil
+ execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Union(unionChildren) =>
- execution.Union(unionChildren.map(planLater)) :: Nil
+ execution.UnionExec(unionChildren.map(planLater)) :: Nil
case logical.Except(left, right) =>
- execution.Except(planLater(left), planLater(right)) :: Nil
+ execution.ExceptExec(planLater(left), planLater(right)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
- execution.Generate(
+ execution.GenerateExec(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
- execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
+ execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r @ logical.Range(start, end, step, numSlices, output) =>
- execution.Range(start, step, numSlices, r.numElements, output) :: Nil
+ execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil
case logical.RepartitionByExpression(expressions, child, nPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
- case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
+ case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil
case BroadcastHint(child) => planLater(child) :: Nil
case _ => Nil
}
@@ -416,7 +418,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) =>
- ExecutedCommand(
+ ExecutedCommandExec(
CreateTempTableUsing(
tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
case c: CreateTableUsing if !c.temporary =>
@@ -430,15 +432,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsingAsSelect if c.temporary =>
val cmd = CreateTempTableUsingAsSelect(
c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case logical.ShowFunctions(db, pattern) =>
- ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
+ ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
case logical.DescribeFunction(function, extended) =>
- ExecutedCommand(DescribeFunction(function, extended)) :: Nil
+ ExecutedCommandExec(DescribeFunction(function, extended)) :: Nil
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 944962b1c8..6a03bd08c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -39,10 +39,10 @@ trait CodegenSupport extends SparkPlan {
/** Prefix used in the current operator's variable names. */
private def variablePrefix: String = this match {
case _: TungstenAggregate => "agg"
- case _: BroadcastHashJoin => "bhj"
- case _: SortMergeJoin => "smj"
- case _: PhysicalRDD => "rdd"
- case _: DataSourceScan => "scan"
+ case _: BroadcastHashJoinExec => "bhj"
+ case _: SortMergeJoinExec => "smj"
+ case _: RDDScanExec => "rdd"
+ case _: DataSourceScanExec => "scan"
case _ => nodeName.toLowerCase
}
@@ -219,7 +219,7 @@ trait CodegenSupport extends SparkPlan {
* This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
* an RDD iterator of InternalRow.
*/
-case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport {
+case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -256,7 +256,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
override def treeChildren: Seq[SparkPlan] = Nil
}
-object WholeStageCodegen {
+object WholeStageCodegenExec {
val PIPELINE_DURATION_METRIC = "duration"
}
@@ -288,7 +288,7 @@ object WholeStageCodegen {
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
* used to generated code for BoundReference.
*/
-case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport {
+case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -296,7 +296,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
override private[sql] lazy val metrics = Map(
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
- WholeStageCodegen.PIPELINE_DURATION_METRIC))
+ WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
/**
* Generates code for this subtree.
@@ -457,7 +457,7 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
* Inserts a InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
- case j @ SortMergeJoin(_, _, _, _, left, right) if j.supportCodegen =>
+ case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen =>
// The children of SortMergeJoin should do codegen separately.
j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
right = InputAdapter(insertWholeStageCodegen(right)))
@@ -477,7 +477,7 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: CodegenSupport if supportCodegen(plan) =>
- WholeStageCodegen(insertInputAdapter(plan))
+ WholeStageCodegenExec(insertInputAdapter(plan))
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
index a46d0e0ba7..97bbab65af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
@@ -80,12 +80,12 @@ import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, Unsaf
* Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair
* of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
*/
-case class Window(
+case class WindowExec(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
- extends UnaryNode {
+ extends UnaryExecNode {
override def output: Seq[Attribute] =
child.output ++ windowExpression.map(_.toAttribute)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
index 9fcfea8381..3169e0a2fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
@@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
-case class SortBasedAggregate(
+case class SortBasedAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
@@ -34,7 +34,7 @@ case class SortBasedAggregate(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
- extends UnaryNode {
+ extends UnaryExecNode {
private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 49b682a951..782da0ea60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -38,7 +38,7 @@ case class TungstenAggregate(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
- extends UnaryNode with CodegenSupport {
+ extends UnaryExecNode with CodegenSupport {
private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index 4682949fa1..f93c446007 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.{StateStoreRestore, StateStoreSave}
+import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateStoreSaveExec}
/**
* Utility functions used by the query planner to convert our plan to new aggregation code path.
@@ -35,7 +35,7 @@ object Utils {
val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute)
- SortBasedAggregate(
+ SortBasedAggregateExec(
requiredChildDistributionExpressions = Some(groupingExpressions),
groupingExpressions = groupingExpressions,
aggregateExpressions = completeAggregateExpressions,
@@ -66,7 +66,7 @@ object Utils {
resultExpressions = resultExpressions,
child = child)
} else {
- SortBasedAggregate(
+ SortBasedAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
@@ -295,7 +295,7 @@ object Utils {
child = partialAggregate)
}
- val restored = StateStoreRestore(groupingAttributes, None, partialMerged1)
+ val restored = StateStoreRestoreExec(groupingAttributes, None, partialMerged1)
val partialMerged2: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
@@ -312,7 +312,7 @@ object Utils {
child = restored)
}
- val saved = StateStoreSave(groupingAttributes, None, partialMerged2)
+ val saved = StateStoreSaveExec(groupingAttributes, None, partialMerged2)
val finalAndCompleteAggregate: SparkPlan = {
val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 892c57ae7d..83f527f555 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.LongType
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
- extends UnaryNode with CodegenSupport {
+/** Physical plan for Project. */
+case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
+ extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
@@ -74,8 +75,9 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
}
-case class Filter(condition: Expression, child: SparkPlan)
- extends UnaryNode with CodegenSupport with PredicateHelper {
+/** Physical plan for Filter. */
+case class FilterExec(condition: Expression, child: SparkPlan)
+ extends UnaryExecNode with CodegenSupport with PredicateHelper {
// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
@@ -209,7 +211,7 @@ case class Filter(condition: Expression, child: SparkPlan)
}
/**
- * Sample the dataset.
+ * Physical plan for sampling the dataset.
*
* @param lowerBound Lower-bound of the sampling probability (usually 0.0)
* @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
@@ -218,12 +220,12 @@ case class Filter(condition: Expression, child: SparkPlan)
* @param seed the random seed
* @param child the SparkPlan
*/
-case class Sample(
+case class SampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
- child: SparkPlan) extends UnaryNode with CodegenSupport {
+ child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
private[sql] override lazy val metrics = Map(
@@ -301,13 +303,23 @@ case class Sample(
}
}
-case class Range(
+
+/**
+ * Physical plan for range (generating a range of 64 bit numbers.
+ *
+ * @param start first number in the range, inclusive.
+ * @param step size of the step increment.
+ * @param numSlices number of partitions.
+ * @param numElements total number of elements to output.
+ * @param output output attributes.
+ */
+case class RangeExec(
start: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
output: Seq[Attribute])
- extends LeafNode with CodegenSupport {
+ extends LeafExecNode with CodegenSupport {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -449,9 +461,9 @@ case class Range(
}
/**
- * Union two plans, without a distinct. This is UNION ALL in SQL.
+ * Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
*/
-case class Union(children: Seq[SparkPlan]) extends SparkPlan {
+case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
override def output: Seq[Attribute] =
children.map(_.output).transpose.map(attrs =>
attrs.head.withNullability(attrs.exists(_.nullable)))
@@ -461,12 +473,12 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
}
/**
- * Return a new RDD that has exactly `numPartitions` partitions.
+ * Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
*/
-case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
+case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = {
@@ -480,10 +492,10 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
}
/**
- * Returns a table with the elements from left that are not in right using
+ * Physical plan for returning a table with the elements from left that are not in right using
* the built-in spark subtract function.
*/
-case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
override def output: Seq[Attribute] = left.output
protected override def doExecute(): RDD[InternalRow] = {
@@ -496,18 +508,18 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
-case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
+case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil
protected override def doExecute(): RDD[InternalRow] = child.execute()
}
/**
- * A plan as subquery.
+ * Physical plan for a subquery.
*
* This is used to generate tree string for SparkScalarSubquery.
*/
-case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
+case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 1f964b1fc1..cb957b9666 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
@@ -210,11 +210,11 @@ private[sql] case class InMemoryRelation(
}
}
-private[sql] case class InMemoryColumnarTableScan(
+private[sql] case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
- extends LeafNode {
+ extends LeafExecNode {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 45a32131b6..971770a97b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -42,7 +42,7 @@ private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*/
-private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
+private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ac3c52e901..9bebd74b4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -32,8 +32,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS}
-import org.apache.spark.sql.execution.command.ExecutedCommand
+import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -105,12 +105,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
- execution.DataSourceScan.create(
+ execution.DataSourceScanExec.create(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
part, query, overwrite, false) if part.isEmpty =>
- ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
+ ExecutedCommandExec(InsertIntoDataSource(l, query, overwrite)) :: Nil
case _ => Nil
}
@@ -214,22 +214,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
- val scan = execution.DataSourceScan.create(
+ val scan = execution.DataSourceScanExec.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
- filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
- val scan = execution.DataSourceScan.create(
+ val scan = execution.DataSourceScanExec.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
- execution.Project(
- projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
+ execution.ProjectExec(
+ projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index c1a97de72f..751daa0fe2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
+import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -192,7 +192,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
val scan =
- DataSourceScan.create(
+ DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,
@@ -205,11 +205,11 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
"ReadSchema" -> prunedDataSchema.simpleString))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
- val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
+ val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
- execution.Project(projects, withFilter)
+ execution.ProjectExec(projects, withFilter)
}
withProjections :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index e6079ecaad..5b96ab10c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -49,9 +49,9 @@ package object debug {
}
def codegenString(plan: SparkPlan): String = {
- val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegen]()
+ val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
plan transform {
- case s: WholeStageCodegen =>
+ case s: WholeStageCodegenExec =>
codegenSubtrees += s
s
case s => s
@@ -86,11 +86,11 @@ package object debug {
val debugPlan = plan transform {
case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
visited += new TreeNodeRef(s)
- DebugNode(s)
+ DebugExec(s)
}
debugPrint(s"Results returned: ${debugPlan.execute().count()}")
debugPlan.foreach {
- case d: DebugNode => d.dumpStats()
+ case d: DebugExec => d.dumpStats()
case _ =>
}
}
@@ -104,7 +104,7 @@ package object debug {
}
}
- private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode with CodegenSupport {
+ private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
def output: Seq[Attribute] = child.output
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 87a113ee1c..573ca195ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -30,10 +30,10 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils
/**
- * A [[BroadcastExchange]] collects, transforms and finally broadcasts the result of a transformed
- * SparkPlan.
+ * A [[BroadcastExchangeExec]] collects, transforms and finally broadcasts the result of
+ * a transformed SparkPlan.
*/
-case class BroadcastExchange(
+case class BroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan) extends Exchange {
@@ -46,7 +46,7 @@ case class BroadcastExchange(
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
override def sameResult(plan: SparkPlan): Boolean = plan match {
- case p: BroadcastExchange =>
+ case p: BroadcastExchangeExec =>
mode.compatibleWith(p.mode) && child.sameResult(p.child)
case _ => false
}
@@ -85,7 +85,7 @@ case class BroadcastExchange(
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
broadcasted
}
- }(BroadcastExchange.executionContext)
+ }(BroadcastExchangeExec.executionContext)
}
override protected def doPrepare(): Unit = {
@@ -103,7 +103,7 @@ case class BroadcastExchange(
}
}
-object BroadcastExchange {
+object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 4864db7f2a..446571aa84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -160,7 +160,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
child
case (child, BroadcastDistribution(mode)) =>
- BroadcastExchange(mode, child)
+ BroadcastExchangeExec(mode, child)
case (child, distribution) =>
ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
}
@@ -237,7 +237,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
- Sort(requiredOrdering, global = false, child = child)
+ SortExec(requiredOrdering, global = false, child = child)
} else {
child
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index df7ad48812..9da9df6174 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType
* differs significantly, the concept is similar to the exchange operator described in
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
-abstract class Exchange extends UnaryNode {
+abstract class Exchange extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
}
@@ -45,7 +45,8 @@ abstract class Exchange extends UnaryNode {
* logically identical output will have distinct sets of output attribute ids, so we need to
* preserve the original ids because they're what downstream operators are expecting.
*/
-case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode {
+case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchange)
+ extends LeafExecNode {
override def sameResult(plan: SparkPlan): Boolean = {
// Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here.
@@ -86,7 +87,7 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
- ReusedExchange(exchange.output, samePlan.get)
+ ReusedExchangeExec(exchange.output, samePlan.get)
} else {
sameSchema += exchange
exchange
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 89487c6b87..51399e1830 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, Partitioning, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{BinaryNode, CodegenSupport, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.LongType
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.LongType
* broadcast relation. This data is then placed in a Spark broadcast variable. The streamed
* relation is not shuffled.
*/
-case class BroadcastHashJoin(
+case class BroadcastHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
@@ -43,7 +43,7 @@ case class BroadcastHashJoin(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
- extends BinaryNode with HashJoin with CodegenSupport {
+ extends BinaryExecNode with HashJoin with CodegenSupport {
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 4ba710c10a..51afa0017d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -23,16 +23,16 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
-case class BroadcastNestedLoopJoin(
+case class BroadcastNestedLoopJoinExec(
left: SparkPlan,
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
- condition: Option[Expression]) extends BinaryNode {
+ condition: Option[Expression]) extends BinaryExecNode {
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index b1de52b5f4..3ce7c0e315 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -22,7 +22,7 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
@@ -79,7 +79,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
}
-case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+case class CartesianProductExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
override def output: Seq[Attribute] = left.output ++ right.output
override private[sql] lazy val metrics = Map(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 785373b225..68cd3cb49c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, UnsafeRow}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* Performs a hash join of two child relations by first shuffling the data using the join keys.
*/
-case class ShuffledHashJoin(
+case class ShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
@@ -37,7 +37,7 @@ case class ShuffledHashJoin(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
- extends BinaryNode with HashJoin {
+ extends BinaryExecNode with HashJoin {
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 4e45fd6560..96b283a5e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -25,20 +25,20 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{BinaryNode, CodegenSupport, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, RowIterator, SparkPlan}
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.util.collection.BitSet
/**
* Performs an sort merge join of two child relations.
*/
-case class SortMergeJoin(
+case class SortMergeJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
condition: Option[Expression],
left: SparkPlan,
- right: SparkPlan) extends BinaryNode with CodegenSupport {
+ right: SparkPlan) extends BinaryExecNode with CodegenSupport {
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -466,7 +466,7 @@ case class SortMergeJoin(
}
/**
- * Helper class that is used to implement [[SortMergeJoin]].
+ * Helper class that is used to implement [[SortMergeJoinExec]].
*
* To perform an inner (outer) join, users of this class call [[findNextInnerJoinRows()]]
* ([[findNextOuterJoinRows()]]), which returns `true` if a result has been produced and `false`
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index c9a14593fb..b71f3335c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
* This operator will be used when a logical `Limit` operation is the final operator in an
* logical plan, which happens when the user is collecting results back to the driver.
*/
-case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
@@ -46,9 +46,10 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
}
/**
- * Helper trait which defines methods that are shared by both [[LocalLimit]] and [[GlobalLimit]].
+ * Helper trait which defines methods that are shared by both
+ * [[LocalLimitExec]] and [[GlobalLimitExec]].
*/
-trait BaseLimit extends UnaryNode with CodegenSupport {
+trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -91,29 +92,29 @@ trait BaseLimit extends UnaryNode with CodegenSupport {
/**
* Take the first `limit` elements of each child partition, but do not collect or shuffle them.
*/
-case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit {
+case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
}
/**
* Take the first `limit` elements of the child's single output partition.
*/
-case class GlobalLimit(limit: Int, child: SparkPlan) extends BaseLimit {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
}
/**
* Take the first limit elements as defined by the sortOrder, and do projection if needed.
- * This is logically equivalent to having a Limit operator after a [[Sort]] operator,
- * or having a [[Project]] operator between them.
+ * This is logically equivalent to having a Limit operator after a [[SortExec]] operator,
+ * or having a [[ProjectExec]] operator between them.
* This could have been named TopK, but Spark's top operator does the opposite in ordering
* so we name it TakeOrdered to avoid confusion.
*/
-case class TakeOrderedAndProject(
+case class TakeOrderedAndProjectExec(
limit: Int,
sortOrder: Seq[SortOrder],
projectList: Option[Seq[NamedExpression]],
- child: SparkPlan) extends UnaryNode {
+ child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = {
projectList.map(_.map(_.toAttribute)).getOrElse(child.output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 7c8bc7fed8..56a3906951 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, ObjectType}
case class DeserializeToObject(
deserializer: Expression,
outputObjAttr: Attribute,
- child: SparkPlan) extends UnaryNode with CodegenSupport {
+ child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = outputObjAttr :: Nil
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -67,9 +67,9 @@ case class DeserializeToObject(
* Takes the input object from child and turns in into unsafe row using the given serializer
* expression. The output of its child must be a single-field row containing the input object.
*/
-case class SerializeFromObject(
+case class SerializeFromObjectExec(
serializer: Seq[NamedExpression],
- child: SparkPlan) extends UnaryNode with CodegenSupport {
+ child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
@@ -136,10 +136,11 @@ trait ObjectOperator extends SparkPlan {
* Applies the given function to input object iterator.
* The output of its child must be a single-field row containing the input object.
*/
-case class MapPartitions(
+case class MapPartitionsExec(
func: Iterator[Any] => Iterator[Any],
outputObjAttr: Attribute,
- child: SparkPlan) extends UnaryNode with ObjectOperator {
+ child: SparkPlan)
+ extends UnaryExecNode with ObjectOperator {
override def output: Seq[Attribute] = outputObjAttr :: Nil
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -157,13 +158,14 @@ case class MapPartitions(
* Applies the given function to each input object.
* The output of its child must be a single-field row containing the input object.
*
- * This operator is kind of a safe version of [[Project]], as it's output is custom object, we need
- * to use safe row to contain it.
+ * This operator is kind of a safe version of [[ProjectExec]], as it's output is custom object,
+ * we need to use safe row to contain it.
*/
-case class MapElements(
+case class MapElementsExec(
func: AnyRef,
outputObjAttr: Attribute,
- child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport {
+ child: SparkPlan)
+ extends UnaryExecNode with ObjectOperator with CodegenSupport {
override def output: Seq[Attribute] = outputObjAttr :: Nil
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -211,11 +213,11 @@ case class MapElements(
/**
* Applies the given function to each input row, appending the encoded result at the end of the row.
*/
-case class AppendColumns(
+case class AppendColumnsExec(
func: Any => Any,
deserializer: Expression,
serializer: Seq[NamedExpression],
- child: SparkPlan) extends UnaryNode with ObjectOperator {
+ child: SparkPlan) extends UnaryExecNode with ObjectOperator {
override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute)
@@ -236,13 +238,14 @@ case class AppendColumns(
}
/**
- * An optimized version of [[AppendColumns]], that can be executed on deserialized object directly.
+ * An optimized version of [[AppendColumnsExec]], that can be executed
+ * on deserialized object directly.
*/
-case class AppendColumnsWithObject(
+case class AppendColumnsWithObjectExec(
func: Any => Any,
inputSerializer: Seq[NamedExpression],
newColumnsSerializer: Seq[NamedExpression],
- child: SparkPlan) extends UnaryNode with ObjectOperator {
+ child: SparkPlan) extends UnaryExecNode with ObjectOperator {
override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute)
@@ -269,14 +272,14 @@ case class AppendColumnsWithObject(
* Groups the input rows together and calls the function with each group and an iterator containing
* all elements in the group. The result of this function is flattened before being output.
*/
-case class MapGroups(
+case class MapGroupsExec(
func: (Any, Iterator[Any]) => TraversableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputObjAttr: Attribute,
- child: SparkPlan) extends UnaryNode with ObjectOperator {
+ child: SparkPlan) extends UnaryExecNode with ObjectOperator {
override def output: Seq[Attribute] = outputObjAttr :: Nil
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -310,7 +313,7 @@ case class MapGroups(
* iterators containing all elements in the group from left and right side.
* The result of this function is flattened before being output.
*/
-case class CoGroup(
+case class CoGroupExec(
func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
keyDeserializer: Expression,
leftDeserializer: Expression,
@@ -321,7 +324,7 @@ case class CoGroup(
rightAttr: Seq[Attribute],
outputObjAttr: Attribute,
left: SparkPlan,
- right: SparkPlan) extends BinaryNode with ObjectOperator {
+ right: SparkPlan) extends BinaryExecNode with ObjectOperator {
override def output: Seq[Attribute] = outputObjAttr :: Nil
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index c49f173ad6..061d7c7f79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType}
* we drain the queue to find the original input row. Note that if the Python process is way too
* slow, this could lead to the queue growing unbounded and eventually run out of memory.
*/
-case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
+case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index d72b3d347d..ab192360e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -79,7 +79,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] {
val resultAttrs = udfs.zipWithIndex.map { case (u, i) =>
AttributeReference(s"pythonUDF$i", u.dataType)()
}
- val evaluation = BatchPythonEvaluation(validUdfs, child.output ++ resultAttrs, child)
+ val evaluation = BatchEvalPythonExec(validUdfs, child.output ++ resultAttrs, child)
attributeMap ++= validUdfs.zip(resultAttrs)
evaluation
} else {
@@ -105,7 +105,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] {
val newPlan = extract(rewritten)
if (newPlan.output != plan.output) {
// Trim away the new UDF value if it was only used for filtering or something.
- execution.Project(plan.output, newPlan)
+ execution.ProjectExec(plan.output, newPlan)
} else {
newPlan
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 81244ed874..a1a1108447 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.{OutputMode, UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.analysis.OutputMode
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryNode}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
@@ -54,17 +53,17 @@ class IncrementalExecution(
/** Locates save/restore pairs surrounding aggregation. */
val state = new Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan transform {
- case StateStoreSave(keys, None,
- UnaryNode(agg,
- StateStoreRestore(keys2, None, child))) =>
+ case StateStoreSaveExec(keys, None,
+ UnaryExecNode(agg,
+ StateStoreRestoreExec(keys2, None, child))) =>
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1)
operatorId += 1
- StateStoreSave(
+ StateStoreSaveExec(
keys,
Some(stateId),
agg.withNewChildren(
- StateStoreRestore(
+ StateStoreRestoreExec(
keys,
Some(stateId),
child) :: Nil))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 595774761c..de4305f564 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -50,10 +50,11 @@ trait StatefulOperator extends SparkPlan {
* For each input tuple, the key is calculated and the value from the [[StateStore]] is added
* to the stream (in addition to the input tuple) if present.
*/
-case class StateStoreRestore(
+case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
- child: SparkPlan) extends execution.UnaryNode with StatefulOperator {
+ child: SparkPlan)
+ extends execution.UnaryExecNode with StatefulOperator {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithStateStore(
@@ -78,10 +79,11 @@ case class StateStoreRestore(
/**
* For each input tuple, the key is calculated and the tuple is `put` into the [[StateStore]].
*/
-case class StateStoreSave(
+case class StateStoreSaveExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
- child: SparkPlan) extends execution.UnaryNode with StatefulOperator {
+ child: SparkPlan)
+ extends execution.UnaryExecNode with StatefulOperator {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithStateStore(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index c023cc573c..1341e45483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -40,7 +40,7 @@ case class ScalarSubquery(
override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
throw new UnsupportedOperationException
}
- override def plan: SparkPlan = Subquery(simpleString, executedPlan)
+ override def plan: SparkPlan = SubqueryExec(simpleString, executedPlan)
override def dataType: DataType = executedPlan.schema.fields.head.dataType
override def children: Seq[Expression] = Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index c6fcb6956c..1959f1e368 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.commons.lang3.StringEscapeUtils
-import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegen}
+import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -178,7 +178,7 @@ private[ui] class SparkPlanGraphCluster(
extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {
override def makeDotNode(metricsValue: Map[Long, String]): String = {
- val duration = metrics.filter(_.name.startsWith(WholeStageCodegen.PIPELINE_DURATION_METRIC))
+ val duration = metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
val labelStr = if (duration.nonEmpty) {
require(duration.length == 1)
val id = duration(0).accumulatorId
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 82b79c791d..4aea21e52a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
import org.apache.spark.Accumulators
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.functions._
@@ -38,7 +38,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
def rddIdOf(tableName: String): Int = {
val plan = sqlContext.table(tableName).queryExecution.sparkPlan
plan.collect {
- case InMemoryColumnarTableScan(_, _, relation) =>
+ case InMemoryTableScanExec(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + plan)
@@ -167,7 +167,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sqlContext.cacheTable("testData")
assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
sqlContext.table("testData").queryExecution.withCachedData.collect {
- case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
+ case r @ InMemoryRelation(_, _, _, _, _: InMemoryTableScanExec, _) => r
}.size
}
@@ -351,8 +351,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
|abc a join abc b on a.key=b.key
|join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
- assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
- assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
+ assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 3)
+ assert(sparkPlan.collect { case e: RDDScanExec => e }.size === 0)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 351b03b38b..19fe29a202 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.scalatest.Matchers._
import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -631,7 +631,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
val projects = df.queryExecution.sparkPlan.collect {
- case tungstenProject: Project => tungstenProject
+ case tungstenProject: ProjectExec => tungstenProject
}
assert(projects.size === expectedNumProjects)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 067a62d011..0414fa1c91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.execution.joins.BroadcastHashJoin
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@@ -142,11 +142,11 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
// equijoin - should be converted into broadcast join
val plan1 = df1.join(broadcast(df2), "key").queryExecution.sparkPlan
- assert(plan1.collect { case p: BroadcastHashJoin => p }.size === 1)
+ assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1)
// no join key -- should not be a broadcast join
val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
- assert(plan2.collect { case p: BroadcastHashJoin => p }.size === 0)
+ assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0)
// planner should not crash without a join
broadcast(df1).queryExecution.sparkPlan
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index e953a6e8ef..4c18784126 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
-import org.apache.spark.sql.execution.exchange.{BroadcastExchange, ReusedExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
@@ -1355,16 +1355,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(join, df)
assert(
join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
- assert(join.queryExecution.executedPlan.collect { case e: ReusedExchange => true }.size === 1)
+ assert(
+ join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1)
val broadcasted = broadcast(join)
val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
checkAnswer(join2, df)
assert(
join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
assert(
- join2.queryExecution.executedPlan.collect { case e: BroadcastExchange => true }.size === 1)
+ join2.queryExecution.executedPlan
+ .collect { case e: BroadcastExchangeExec => true }.size === 1)
assert(
- join2.queryExecution.executedPlan.collect { case e: ReusedExchange => true }.size === 4)
+ join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 4)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a87a41c126..9e5a41d57c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -46,11 +46,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val operators = physical.collect {
- case j: BroadcastHashJoin => j
- case j: ShuffledHashJoin => j
- case j: CartesianProduct => j
- case j: BroadcastNestedLoopJoin => j
- case j: SortMergeJoin => j
+ case j: BroadcastHashJoinExec => j
+ case j: ShuffledHashJoinExec => j
+ case j: CartesianProductExec => j
+ case j: BroadcastNestedLoopJoinExec => j
+ case j: SortMergeJoinExec => j
}
assert(operators.size === 1)
@@ -64,39 +64,43 @@ class JoinSuite extends QueryTest with SharedSQLContext {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
Seq(
- ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData JOIN testData2", classOf[CartesianProduct]),
- ("SELECT * FROM testData JOIN testData2 WHERE key = 2", classOf[CartesianProduct]),
- ("SELECT * FROM testData LEFT JOIN testData2", classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData RIGHT JOIN testData2", classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData FULL OUTER JOIN testData2", classOf[BroadcastNestedLoopJoin]),
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+ classOf[ShuffledHashJoinExec]),
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData JOIN testData2", classOf[CartesianProductExec]),
+ ("SELECT * FROM testData JOIN testData2 WHERE key = 2", classOf[CartesianProductExec]),
+ ("SELECT * FROM testData LEFT JOIN testData2", classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData RIGHT JOIN testData2", classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData FULL OUTER JOIN testData2", classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", classOf[CartesianProduct]),
+ classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2",
+ classOf[CartesianProductExec]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
+ classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProductExec]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a",
- classOf[CartesianProduct]),
- ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
- ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
- ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
- ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoin]),
+ classOf[CartesianProductExec]),
+ ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoinExec]),
+ ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2",
+ classOf[SortMergeJoinExec]),
+ ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2",
+ classOf[SortMergeJoinExec]),
+ ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoinExec]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[SortMergeJoin]),
+ classOf[SortMergeJoinExec]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
- classOf[SortMergeJoin]),
+ classOf[SortMergeJoinExec]),
("SELECT * FROM testData full outer join testData2 ON key = a",
- classOf[SortMergeJoin]),
+ classOf[SortMergeJoinExec]),
("SELECT * FROM testData left JOIN testData2 ON (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin]),
- ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData LEFT ANTI JOIN testData2", classOf[BroadcastNestedLoopJoin])
+ classOf[BroadcastNestedLoopJoinExec]),
+ ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoinExec]),
+ ("SELECT * FROM testData LEFT ANTI JOIN testData2", classOf[BroadcastNestedLoopJoinExec])
).foreach(assertJoin)
}
}
@@ -112,11 +116,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
sql("CACHE TABLE testData")
Seq(
("SELECT * FROM testData join testData2 ON key = a",
- classOf[BroadcastHashJoin]),
+ classOf[BroadcastHashJoinExec]),
("SELECT * FROM testData join testData2 ON key = a and key = 2",
- classOf[BroadcastHashJoin]),
+ classOf[BroadcastHashJoinExec]),
("SELECT * FROM testData join testData2 ON key = a where key = 2",
- classOf[BroadcastHashJoin])
+ classOf[BroadcastHashJoinExec])
).foreach(assertJoin)
sql("UNCACHE TABLE testData")
}
@@ -127,11 +131,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
sql("CACHE TABLE testData2")
Seq(
("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
- classOf[BroadcastHashJoin]),
+ classOf[BroadcastHashJoinExec]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[BroadcastHashJoin]),
+ classOf[BroadcastHashJoinExec]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
- classOf[BroadcastHashJoin])
+ classOf[BroadcastHashJoinExec])
).foreach(assertJoin)
sql("UNCACHE TABLE testData")
}
@@ -428,15 +432,18 @@ class JoinSuite extends QueryTest with SharedSQLContext {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
Seq(
- ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastHashJoin]),
- ("SELECT * FROM testData ANT JOIN testData2 ON key = a", classOf[BroadcastHashJoin])
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+ classOf[BroadcastHashJoinExec]),
+ ("SELECT * FROM testData ANT JOIN testData2 ON key = a", classOf[BroadcastHashJoinExec])
).foreach(assertJoin)
}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Seq(
- ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin])
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+ classOf[ShuffledHashJoinExec]),
+ ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a",
+ classOf[ShuffledHashJoinExec])
).foreach(assertJoin)
}
@@ -460,35 +467,35 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
- classOf[ShuffledHashJoin]),
+ classOf[ShuffledHashJoinExec]),
("SELECT * FROM testData LEFT SEMI JOIN testData2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData JOIN testData2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData LEFT JOIN testData2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData RIGHT JOIN testData2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData FULL OUTER JOIN testData2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData JOIN testData2 WHERE key > a",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData left JOIN testData2 WHERE (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData right JOIN testData2 WHERE (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin]),
+ classOf[BroadcastNestedLoopJoinExec]),
("SELECT * FROM testData full JOIN testData2 WHERE (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin])
+ classOf[BroadcastNestedLoopJoinExec])
).foreach(assertJoin)
checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9e640493cf..84f0c0083b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
@@ -866,12 +866,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-11111 null-safe join should not use cartesian product") {
val df = sql("select count(*) from testData a join testData b on (a.key <=> b.key)")
val cp = df.queryExecution.sparkPlan.collect {
- case cp: CartesianProduct => cp
+ case cp: CartesianProductExec => cp
}
assert(cp.isEmpty, "should not use CartesianProduct for null-safe join")
val smj = df.queryExecution.sparkPlan.collect {
- case smj: SortMergeJoin => smj
- case j: BroadcastHashJoin => j
+ case smj: SortMergeJoinExec => smj
+ case j: BroadcastHashJoinExec => j
}
assert(smj.size > 0, "should use SortMergeJoin or BroadcastHashJoin")
checkAnswer(df, Row(100) :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 17f2343cf9..ba16810cee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, IdentityBroadcastMode, SinglePartition}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchange, ReusedExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
import org.apache.spark.sql.test.SharedSQLContext
@@ -55,13 +55,13 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
val output = plan.output
assert(plan sameResult plan)
- val exchange1 = BroadcastExchange(IdentityBroadcastMode, plan)
+ val exchange1 = BroadcastExchangeExec(IdentityBroadcastMode, plan)
val hashMode = HashedRelationBroadcastMode(output)
- val exchange2 = BroadcastExchange(hashMode, plan)
+ val exchange2 = BroadcastExchangeExec(hashMode, plan)
val hashMode2 =
HashedRelationBroadcastMode(Alias(output.head, "id2")() :: Nil)
- val exchange3 = BroadcastExchange(hashMode2, plan)
- val exchange4 = ReusedExchange(output, exchange3)
+ val exchange3 = BroadcastExchangeExec(hashMode2, plan)
+ val exchange4 = ReusedExchangeExec(output, exchange3)
assert(exchange1 sameResult exchange1)
assert(exchange2 sameResult exchange2)
@@ -87,7 +87,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
val exchange3 = ShuffleExchange(part2, plan)
val part3 = HashPartitioning(output ++ output, 2)
val exchange4 = ShuffleExchange(part3, plan)
- val exchange5 = ReusedExchange(output, exchange4)
+ val exchange5 = ReusedExchangeExec(output, exchange4)
assert(exchange1 sameResult exchange1)
assert(exchange2 sameResult exchange2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index bdbcf842ca..3b2911d056 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchange, ReuseExchange, ShuffleExchange}
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -86,8 +86,8 @@ class PlannerSuite extends SharedSQLContext {
|FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a = r.key)
""".stripMargin).queryExecution.sparkPlan
- val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
- val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
+ val broadcastHashJoins = planned.collect { case join: BroadcastHashJoinExec => join }
+ val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => join }
assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
@@ -139,8 +139,8 @@ class PlannerSuite extends SharedSQLContext {
val b = sqlContext.table("tiny").as("b")
val planned = a.join(b, $"a.key" === $"b.key").queryExecution.sparkPlan
- val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
- val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
+ val broadcastHashJoins = planned.collect { case join: BroadcastHashJoinExec => join }
+ val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => join }
assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
assert(sortMergeJoins.isEmpty, "Should not use shuffled hash join")
@@ -167,34 +167,34 @@ class PlannerSuite extends SharedSQLContext {
test("efficient terminal limit -> sort should use TakeOrderedAndProject") {
val query = testData.select('key, 'value).sort('key).limit(2)
val planned = query.queryExecution.executedPlan
- assert(planned.isInstanceOf[execution.TakeOrderedAndProject])
+ assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec])
assert(planned.output === testData.select('key, 'value).logicalPlan.output)
}
test("terminal limit -> project -> sort should use TakeOrderedAndProject") {
val query = testData.select('key, 'value).sort('key).select('value, 'key).limit(2)
val planned = query.queryExecution.executedPlan
- assert(planned.isInstanceOf[execution.TakeOrderedAndProject])
+ assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec])
assert(planned.output === testData.select('value, 'key).logicalPlan.output)
}
test("terminal limits that are not handled by TakeOrderedAndProject should use CollectLimit") {
val query = testData.select('value).limit(2)
val planned = query.queryExecution.sparkPlan
- assert(planned.isInstanceOf[CollectLimit])
+ assert(planned.isInstanceOf[CollectLimitExec])
assert(planned.output === testData.select('value).logicalPlan.output)
}
test("TakeOrderedAndProject can appear in the middle of plans") {
val query = testData.select('key, 'value).sort('key).limit(2).filter('key === 3)
val planned = query.queryExecution.executedPlan
- assert(planned.find(_.isInstanceOf[TakeOrderedAndProject]).isDefined)
+ assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
}
test("CollectLimit can appear in the middle of a plan when caching is used") {
val query = testData.select('key, 'value).limit(2).cache()
val planned = query.queryExecution.optimizedPlan.asInstanceOf[InMemoryRelation]
- assert(planned.child.isInstanceOf[CollectLimit])
+ assert(planned.child.isInstanceOf[CollectLimitExec])
}
test("PartitioningCollection") {
@@ -394,7 +394,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: Sort => true }.isEmpty) {
+ if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
fail(s"Sort should have been added:\n$outputPlan")
}
}
@@ -410,7 +410,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: Sort => true }.nonEmpty) {
+ if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
fail(s"No sorts should have been added:\n$outputPlan")
}
}
@@ -427,7 +427,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: Sort => true }.isEmpty) {
+ if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
fail(s"Sort should have been added:\n$outputPlan")
}
}
@@ -485,7 +485,7 @@ class PlannerSuite extends SharedSQLContext {
requiredChildOrdering = Seq(Seq.empty)),
None)
- val inputPlan = SortMergeJoin(
+ val inputPlan = SortMergeJoinExec(
Literal(1) :: Nil,
Literal(1) :: Nil,
Inner,
@@ -494,7 +494,7 @@ class PlannerSuite extends SharedSQLContext {
shuffle)
val outputPlan = ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan)
- if (outputPlan.collect { case e: ReusedExchange => true }.size != 1) {
+ if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {
fail(s"Should re-use the shuffle:\n$outputPlan")
}
if (outputPlan.collect { case e: ShuffleExchange => true }.size != 1) {
@@ -502,7 +502,7 @@ class PlannerSuite extends SharedSQLContext {
}
// nested exchanges
- val inputPlan2 = SortMergeJoin(
+ val inputPlan2 = SortMergeJoinExec(
Literal(1) :: Nil,
Literal(1) :: Nil,
Inner,
@@ -511,7 +511,7 @@ class PlannerSuite extends SharedSQLContext {
ShuffleExchange(finalPartitioning, inputPlan))
val outputPlan2 = ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan2)
- if (outputPlan2.collect { case e: ReusedExchange => true }.size != 2) {
+ if (outputPlan2.collect { case e: ReusedExchangeExec => true }.size != 2) {
fail(s"Should re-use the two shuffles:\n$outputPlan2")
}
if (outputPlan2.collect { case e: ShuffleExchange => true }.size != 2) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index 2963a856d1..a19ea51af7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -34,7 +34,7 @@ case class ReferenceSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
- extends UnaryNode {
+ extends UnaryExecNode {
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 778477660e..ebeb39b690 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -43,13 +43,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
checkAnswer(
input.toDF("a", "b", "c"),
- (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child = child),
+ (child: SparkPlan) => SortExec('a.asc :: 'b.asc :: Nil, global = true, child = child),
input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
sortAnswers = false)
checkAnswer(
input.toDF("a", "b", "c"),
- (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child = child),
+ (child: SparkPlan) => SortExec('b.asc :: 'a.asc :: Nil, global = true, child = child),
input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
sortAnswers = false)
}
@@ -57,8 +57,10 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => GlobalLimit(10, Sort('a.asc :: Nil, global = true, child = child)),
- (child: SparkPlan) => GlobalLimit(10, ReferenceSort('a.asc :: Nil, global = true, child)),
+ (child: SparkPlan) =>
+ GlobalLimitExec(10, SortExec('a.asc :: Nil, global = true, child = child)),
+ (child: SparkPlan) =>
+ GlobalLimitExec(10, ReferenceSort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
}
@@ -68,7 +70,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
val stringLength = 1024 * 1024 * 2
checkThatPlansAgree(
Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
- Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+ SortExec(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
ReferenceSort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
@@ -78,7 +80,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = child),
+ (child: SparkPlan) => SortExec('a.asc :: Nil, global = true, child = child),
(child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, child),
sortAnswers = false)
}
@@ -99,7 +101,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
)
checkThatPlansAgree(
inputDf,
- p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23),
+ p => SortExec(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23),
ReferenceSort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index a4c6d072f3..fba04d0cb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -49,7 +49,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
* Adds a no-op filter to the child plan in order to prevent executeCollect() from being
* called directly on the child plan.
*/
- private def noOpFilter(plan: SparkPlan): SparkPlan = Filter(Literal(true), plan)
+ private def noOpFilter(plan: SparkPlan): SparkPlan = FilterExec(Literal(true), plan)
val limit = 250
val sortOrder = 'a.desc :: 'b.desc :: Nil
@@ -59,11 +59,11 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
checkThatPlansAgree(
generateRandomInputData(),
input =>
- noOpFilter(TakeOrderedAndProject(limit, sortOrder, None, input)),
+ noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, None, input)),
input =>
- GlobalLimit(limit,
- LocalLimit(limit,
- Sort(sortOrder, true, input))),
+ GlobalLimitExec(limit,
+ LocalLimitExec(limit,
+ SortExec(sortOrder, true, input))),
sortAnswers = false)
}
}
@@ -73,12 +73,13 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
checkThatPlansAgree(
generateRandomInputData(),
input =>
- noOpFilter(TakeOrderedAndProject(limit, sortOrder, Some(Seq(input.output.last)), input)),
+ noOpFilter(
+ TakeOrderedAndProjectExec(limit, sortOrder, Some(Seq(input.output.last)), input)),
input =>
- GlobalLimit(limit,
- LocalLimit(limit,
- Project(Seq(input.output.last),
- Sort(sortOrder, true, input)))),
+ GlobalLimitExec(limit,
+ LocalLimitExec(limit,
+ ProjectExec(Seq(input.output.last),
+ SortExec(sortOrder, true, input)))),
sortAnswers = false)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index d7cf1dc6aa..233104ae84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
-import org.apache.spark.sql.execution.joins.BroadcastHashJoin
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.expressions.scala.typed
import org.apache.spark.sql.functions.{avg, broadcast, col, max}
import org.apache.spark.sql.test.SharedSQLContext
@@ -30,7 +30,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
test("range/filter should be combined") {
val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1")
val plan = df.queryExecution.executedPlan
- assert(plan.find(_.isInstanceOf[WholeStageCodegen]).isDefined)
+ assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
assert(df.collect() === Array(Row(2)))
}
@@ -38,8 +38,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val df = sqlContext.range(10).groupBy().agg(max(col("id")), avg(col("id")))
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
assert(df.collect() === Array(Row(9, 4.5)))
}
@@ -47,8 +47,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val df = sqlContext.range(3).groupBy("id").count().orderBy("id")
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
}
@@ -58,8 +58,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val smallDF = sqlContext.createDataFrame(rdd, schema)
val df = sqlContext.range(10).join(broadcast(smallDF), col("k") === col("id"))
assert(df.queryExecution.executedPlan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[BroadcastHashJoin]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined)
assert(df.collect() === Array(Row(1, 1, "1"), Row(1, 1, "1"), Row(2, 2, "2")))
}
@@ -67,8 +67,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Sort]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined)
assert(df.collect() === Array(Row(1), Row(2), Row(3)))
}
@@ -78,8 +78,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val ds = sqlContext.range(10).map(_.toString)
val plan = ds.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
assert(ds.collect() === 0.until(10).map(_.toString).toArray)
}
@@ -87,8 +87,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val ds = sqlContext.range(10).filter(_ % 2 == 0)
val plan = ds.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Filter]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
assert(ds.collect() === Array(0, 2, 4, 6, 8))
}
@@ -96,8 +96,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val ds = sqlContext.range(10).filter(_ % 2 == 0).filter(_ % 3 == 0)
val plan = ds.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
assert(ds.collect() === Array(0, 6))
}
@@ -109,8 +109,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val plan = ds.queryExecution.executedPlan
assert(plan.find(p =>
- p.isInstanceOf[WholeStageCodegen] &&
- p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+ p.isInstanceOf[WholeStageCodegenExec] &&
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 4f185ed283..9164074a3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -133,7 +133,7 @@ class PartitionBatchPruningSuite
}
val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
- case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
+ case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
}.head
assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 50cd03a40c..fb70dbd961 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -375,7 +375,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
def getPhysicalFilters(df: DataFrame): ExpressionSet = {
ExpressionSet(
df.queryExecution.executedPlan.collect {
- case execution.Filter(f, _) => splitConjunctivePredicates(f)
+ case execution.FilterExec(f, _) => splitConjunctivePredicates(f)
}.flatten)
}
@@ -422,7 +422,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
def getFileScanRDD(df: DataFrame): FileScanRDD = {
df.queryExecution.executedPlan.collect {
- case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
scan.rdd.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index babe7ef70f..b9df43d049 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -71,15 +71,15 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
}
test("unsafe broadcast hash join updates peak execution memory") {
- testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash join", "inner")
+ testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash join", "inner")
}
test("unsafe broadcast hash outer join updates peak execution memory") {
- testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash outer join", "left_outer")
+ testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash outer join", "left_outer")
}
test("unsafe broadcast left semi join updates peak execution memory") {
- testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast left semi join", "leftsemi")
+ testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast left semi join", "leftsemi")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
index 8cdfa8afd0..bc838ee4da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
@@ -83,7 +83,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(left.sqlContext.sessionState.conf).apply(
- ShuffledHashJoin(
+ ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)),
expectedAnswer,
sortAnswers = true)
@@ -96,7 +96,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(left.sqlContext.sessionState.conf).apply(
- BroadcastHashJoin(
+ BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)),
expectedAnswer,
sortAnswers = true)
@@ -108,7 +108,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(left.sqlContext.sessionState.conf).apply(
- BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, Some(condition))),
+ BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, Some(condition))),
expectedAnswer,
sortAnswers = true)
}
@@ -118,7 +118,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(left.sqlContext.sessionState.conf).apply(
- BroadcastNestedLoopJoin(left, right, BuildRight, joinType, Some(condition))),
+ BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, Some(condition))),
expectedAnswer,
sortAnswers = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 3cb3ef1ffa..933f32e496 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -91,7 +91,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
leftPlan: SparkPlan,
rightPlan: SparkPlan,
side: BuildSide) = {
- val broadcastJoin = joins.BroadcastHashJoin(
+ val broadcastJoin = joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
Inner,
@@ -110,9 +110,9 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
rightPlan: SparkPlan,
side: BuildSide) = {
val shuffledHashJoin =
- joins.ShuffledHashJoin(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan)
+ joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan)
val filteredJoin =
- boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
+ boundCondition.map(FilterExec(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
EnsureRequirements(sqlContext.sessionState.conf).apply(filteredJoin)
}
@@ -123,7 +123,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
leftPlan: SparkPlan,
rightPlan: SparkPlan) = {
val sortMergeJoin =
- joins.SortMergeJoin(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan)
+ joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan)
EnsureRequirements(sqlContext.sessionState.conf).apply(sortMergeJoin)
}
@@ -189,7 +189,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
test(s"$testName using CartesianProduct") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- Filter(condition(), CartesianProduct(left, right)),
+ FilterExec(condition(), CartesianProductExec(left, right)),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@@ -198,7 +198,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
test(s"$testName using BroadcastNestedLoopJoin build left") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildLeft, Inner, Some(condition())),
+ BroadcastNestedLoopJoinExec(left, right, BuildLeft, Inner, Some(condition())),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@@ -207,7 +207,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
test(s"$testName using BroadcastNestedLoopJoin build right") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildRight, Inner, Some(condition())),
+ BroadcastNestedLoopJoinExec(left, right, BuildRight, Inner, Some(condition())),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 4cacb20aa0..c26cb8483e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -83,7 +83,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(sqlContext.sessionState.conf).apply(
- ShuffledHashJoin(
+ ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right)),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
@@ -102,7 +102,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastHashJoin(
+ BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
@@ -116,7 +116,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
EnsureRequirements(sqlContext.sessionState.conf).apply(
- SortMergeJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)),
+ SortMergeJoinExec(leftKeys, rightKeys, joinType, boundCondition, left, right)),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@@ -126,7 +126,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
test(s"$testName using BroadcastNestedLoopJoin build left") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, Some(condition)),
+ BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, Some(condition)),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@@ -135,7 +135,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
test(s"$testName using BroadcastNestedLoopJoin build right") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildRight, joinType, Some(condition)),
+ BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, Some(condition)),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index f66deea065..c24abf1650 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -208,10 +208,10 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
- assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+ assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+ assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
@@ -246,9 +246,9 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
// cannot compile given predicates.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.Filter])
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+ assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
df
}
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 19e34b45bf..14707774cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.DataSourceScan => p
+ case p: execution.DataSourceScanExec => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index 62f991fc5d..9bb901bfb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.DataSourceScan => p
+ case p: execution.DataSourceScanExec => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index f6150198dd..5691105235 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.Filter
+import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.util.Utils
/**
@@ -242,7 +242,7 @@ private[sql] trait SQLTestUtils
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val withoutFilters = df.queryExecution.sparkPlan transform {
- case Filter(_, child) => child
+ case FilterExec(_, child) => child
}
val childRDD = withoutFilters
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index e7d2b5ad96..eb25ea0629 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.sql.{functions, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
-import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegen}
+import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec}
import org.apache.spark.sql.test.SharedSQLContext
class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
@@ -93,7 +93,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
val metric = qe.executedPlan match {
- case w: WholeStageCodegen => w.child.longMetric("numOutputRows")
+ case w: WholeStageCodegenExec => w.child.longMetric("numOutputRows")
case other => other.longMetric("numOutputRows")
}
metrics += metric.value.value
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index bc45334036..f15f5b01e2 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -224,7 +224,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val plan = statement.executeQuery("explain select * from test_table")
plan.next()
plan.next()
- assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+ assert(plan.getString(1).contains("InMemoryTableScan"))
val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
val buf1 = new collection.mutable.ArrayBuffer[Int]()
@@ -310,7 +310,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
plan.next()
plan.next()
- assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+ assert(plan.getString(1).contains("InMemoryTableScan"))
val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
val buf = new collection.mutable.ArrayBuffer[Int]()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5b7fbe0ce5..2d36ddafe6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -78,7 +78,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScan(_, relation, pruningPredicates)(context, hiveconf)) :: Nil
+ HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil
case _ =>
Nil
}
@@ -91,17 +91,17 @@ private[hive] trait HiveStrategies {
val cmd =
CreateMetastoreDataSource(
tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case c: CreateTableUsingAsSelect if c.temporary =>
val cmd = CreateTempTableUsingAsSelect(
c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case c: CreateTableUsingAsSelect =>
val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns,
c.bucketSpec, c.mode, c.options, c.child)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 9a834660f9..0f72091096 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -44,13 +44,13 @@ import org.apache.spark.util.Utils
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
private[hive]
-case class HiveTableScan(
+case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
@transient val context: SQLContext,
@transient val hiveconf: HiveConf)
- extends LeafNode {
+ extends LeafExecNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index e614daadf3..3cb6081687 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
@@ -41,7 +41,7 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
child: SparkPlan,
overwrite: Boolean,
- ifNotExists: Boolean) extends UnaryNode {
+ ifNotExists: Boolean) extends UnaryExecNode {
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 8c8becfb87..f27337eb36 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -59,7 +59,7 @@ case class ScriptTransformation(
output: Seq[Attribute],
child: SparkPlan,
ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
- extends UnaryNode {
+ extends UnaryExecNode {
override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11384a0275..97bd47a247 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
-import org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
@@ -31,7 +31,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
def rddIdOf(tableName: String): Int = {
val plan = table(tableName).queryExecution.sparkPlan
plan.collect {
- case InMemoryColumnarTableScan(_, _, relation) =>
+ case InMemoryTableScanExec(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + plan)
@@ -211,7 +211,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
cacheTable("cachedTable")
val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan
- assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1)
+ assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1)
sql("DROP TABLE cachedTable")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 565b310bb7..93a6f0bb58 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -153,7 +153,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${df.queryExecution}")
@@ -164,10 +164,10 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""")
df = sql(query)
- bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
+ val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoinExec => j }
assert(shj.size === 1,
"SortMergeJoin should be planned when BroadcastHashJoin is turned off")
@@ -210,7 +210,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
var bhj = df.queryExecution.sparkPlan.collect {
- case j: BroadcastHashJoin => j
+ case j: BroadcastHashJoinExec => j
}
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${df.queryExecution}")
@@ -223,12 +223,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
df = sql(leftSemiJoinQuery)
bhj = df.queryExecution.sparkPlan.collect {
- case j: BroadcastHashJoin => j
+ case j: BroadcastHashJoinExec => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
val shj = df.queryExecution.sparkPlan.collect {
- case j: ShuffledHashJoin => j
+ case j: ShuffledHashJoinExec => j
}
assert(shj.size === 1,
"LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 3ddffeb14a..aac5cc6d40 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -485,7 +485,7 @@ abstract class HiveComparisonTest
// also print out the query plans and results for those.
val computedTablesMessages: String = try {
val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect {
- case ts: HiveTableScan => ts.relation.tableName
+ case ts: HiveTableScanExec => ts.relation.tableName
}.toSet
TestHive.reset()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 93d63f2241..467a67259f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.hive.test.TestHive._
@@ -121,7 +121,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN") {
def assertBroadcastNestedLoopJoin(sqlText: String): Unit = {
assert(sql(sqlText).queryExecution.sparkPlan.collect {
- case _: BroadcastNestedLoopJoin => 1
+ case _: BroadcastNestedLoopJoinExec => 1
}.nonEmpty)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 6b424d7343..2de429bdab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo}
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.hive.test.TestHive
/**
@@ -50,7 +50,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
test("[SPARK-2210] boolean cast on boolean value should be removed") {
val q = "select cast(cast(key=0 as boolean) as boolean) from src"
val project = TestHive.sql(q).queryExecution.sparkPlan.collect {
- case e: Project => e
+ case e: ProjectExec => e
}.head
// No cast expression introduced
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 12f30e2e74..24df73b40e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
val plan = new TestHiveQueryExecution(sql).sparkPlan
val actualOutputColumns = plan.output.map(_.name)
val (actualScannedColumns, actualPartValues) = plan.collect {
- case p @ HiveTableScan(columns, relation, _) =>
+ case p @ HiveTableScanExec(columns, relation, _) =>
val columnNames = columns.map(_.name)
val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) {
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 00b5c8dd41..1a15fb741a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types.StringType
@@ -111,7 +111,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
}
-private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode {
+private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().map { x =>
assert(TaskContext.get() != null) // Make sure that TaskContext is defined.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2984ee99be..1c1f6d910d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,10 +21,10 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.DataSourceScan
-import org.apache.spark.sql.execution.command.ExecutedCommand
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -192,11 +192,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test(s"conversion is working") {
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: HiveTableScan => true
+ case _: HiveTableScanExec => true
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: DataSourceScan => true
+ case _: DataSourceScanExec => true
}.nonEmpty)
}
@@ -307,7 +307,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
- case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
+ case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " +
@@ -337,7 +337,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.sparkPlan match {
- case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
+ case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index aa6101f7b7..d271e55467 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,10 +22,10 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
-import org.apache.spark.sql.execution.joins.SortMergeJoin
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
- val rdd = plan.find(_.isInstanceOf[DataSourceScan])
+ val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
assert(rdd.isDefined, plan)
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
@@ -261,8 +261,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
joined.sort("bucketed_table1.k", "bucketed_table2.k"),
df1.join(df2, joinCondition(df1, df2, joinColumns)).sort("df1.k", "df2.k"))
- assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoin])
- val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoin]
+ assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
+ val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index a15bd227a9..19749a9713 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -152,8 +152,8 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.sparkPlan
- assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
- assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.ProjectExec => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.FilterExec => p }.length === 1)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 089cef615f..5378336ff8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -688,7 +688,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
.load(path)
val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst {
- case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
scan.rdd.asInstanceOf[FileScanRDD]
}