aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-22 17:43:56 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 17:43:56 -0700
commitd7d0cad0ad7667c0e09ae01601ee0e4d0b09963c (patch)
tree10ab6d65e25c6451a1cdde83321c989a6e901856 /sql/hive
parentc431a76d0628985bb445189b9a2913dd41b86f7b (diff)
downloadspark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.tar.gz
spark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.tar.bz2
spark-d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c.zip
[SPARK-14855][SQL] Add "Exec" suffix to physical operators
## What changes were proposed in this pull request? This patch adds "Exec" suffix to all physical operators. Before this patch, Spark's physical operators and logical operators are named the same (e.g. Project could be logical.Project or execution.Project), which caused small issues in code review and bigger issues in code refactoring. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #12617 from rxin/exec-node.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala)4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala4
15 files changed, 42 insertions, 42 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5b7fbe0ce5..2d36ddafe6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -78,7 +78,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScan(_, relation, pruningPredicates)(context, hiveconf)) :: Nil
+ HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil
case _ =>
Nil
}
@@ -91,17 +91,17 @@ private[hive] trait HiveStrategies {
val cmd =
CreateMetastoreDataSource(
tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case c: CreateTableUsingAsSelect if c.temporary =>
val cmd = CreateTempTableUsingAsSelect(
c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case c: CreateTableUsingAsSelect =>
val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns,
c.bucketSpec, c.mode, c.options, c.child)
- ExecutedCommand(cmd) :: Nil
+ ExecutedCommandExec(cmd) :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 9a834660f9..0f72091096 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -44,13 +44,13 @@ import org.apache.spark.util.Utils
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
private[hive]
-case class HiveTableScan(
+case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
@transient val context: SQLContext,
@transient val hiveconf: HiveConf)
- extends LeafNode {
+ extends LeafExecNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index e614daadf3..3cb6081687 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
@@ -41,7 +41,7 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
child: SparkPlan,
overwrite: Boolean,
- ifNotExists: Boolean) extends UnaryNode {
+ ifNotExists: Boolean) extends UnaryExecNode {
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 8c8becfb87..f27337eb36 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -59,7 +59,7 @@ case class ScriptTransformation(
output: Seq[Attribute],
child: SparkPlan,
ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
- extends UnaryNode {
+ extends UnaryExecNode {
override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11384a0275..97bd47a247 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
-import org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
@@ -31,7 +31,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
def rddIdOf(tableName: String): Int = {
val plan = table(tableName).queryExecution.sparkPlan
plan.collect {
- case InMemoryColumnarTableScan(_, _, relation) =>
+ case InMemoryTableScanExec(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + plan)
@@ -211,7 +211,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
cacheTable("cachedTable")
val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan
- assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1)
+ assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1)
sql("DROP TABLE cachedTable")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 565b310bb7..93a6f0bb58 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -153,7 +153,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${df.queryExecution}")
@@ -164,10 +164,10 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""")
df = sql(query)
- bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
+ val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoinExec => j }
assert(shj.size === 1,
"SortMergeJoin should be planned when BroadcastHashJoin is turned off")
@@ -210,7 +210,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
var bhj = df.queryExecution.sparkPlan.collect {
- case j: BroadcastHashJoin => j
+ case j: BroadcastHashJoinExec => j
}
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${df.queryExecution}")
@@ -223,12 +223,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
df = sql(leftSemiJoinQuery)
bhj = df.queryExecution.sparkPlan.collect {
- case j: BroadcastHashJoin => j
+ case j: BroadcastHashJoinExec => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
val shj = df.queryExecution.sparkPlan.collect {
- case j: ShuffledHashJoin => j
+ case j: ShuffledHashJoinExec => j
}
assert(shj.size === 1,
"LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 3ddffeb14a..aac5cc6d40 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -485,7 +485,7 @@ abstract class HiveComparisonTest
// also print out the query plans and results for those.
val computedTablesMessages: String = try {
val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect {
- case ts: HiveTableScan => ts.relation.tableName
+ case ts: HiveTableScanExec => ts.relation.tableName
}.toSet
TestHive.reset()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 93d63f2241..467a67259f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.hive.test.TestHive._
@@ -121,7 +121,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN") {
def assertBroadcastNestedLoopJoin(sqlText: String): Unit = {
assert(sql(sqlText).queryExecution.sparkPlan.collect {
- case _: BroadcastNestedLoopJoin => 1
+ case _: BroadcastNestedLoopJoinExec => 1
}.nonEmpty)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 6b424d7343..2de429bdab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo}
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.hive.test.TestHive
/**
@@ -50,7 +50,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
test("[SPARK-2210] boolean cast on boolean value should be removed") {
val q = "select cast(cast(key=0 as boolean) as boolean) from src"
val project = TestHive.sql(q).queryExecution.sparkPlan.collect {
- case e: Project => e
+ case e: ProjectExec => e
}.head
// No cast expression introduced
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 12f30e2e74..24df73b40e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
val plan = new TestHiveQueryExecution(sql).sparkPlan
val actualOutputColumns = plan.output.map(_.name)
val (actualScannedColumns, actualPartValues) = plan.collect {
- case p @ HiveTableScan(columns, relation, _) =>
+ case p @ HiveTableScanExec(columns, relation, _) =>
val columnNames = columns.map(_.name)
val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) {
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 00b5c8dd41..1a15fb741a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types.StringType
@@ -111,7 +111,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
}
-private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode {
+private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().map { x =>
assert(TaskContext.get() != null) // Make sure that TaskContext is defined.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2984ee99be..1c1f6d910d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,10 +21,10 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.DataSourceScan
-import org.apache.spark.sql.execution.command.ExecutedCommand
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -192,11 +192,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test(s"conversion is working") {
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: HiveTableScan => true
+ case _: HiveTableScanExec => true
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: DataSourceScan => true
+ case _: DataSourceScanExec => true
}.nonEmpty)
}
@@ -307,7 +307,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
- case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
+ case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " +
@@ -337,7 +337,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.sparkPlan match {
- case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK
+ case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index aa6101f7b7..d271e55467 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,10 +22,10 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
-import org.apache.spark.sql.execution.joins.SortMergeJoin
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
- val rdd = plan.find(_.isInstanceOf[DataSourceScan])
+ val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
assert(rdd.isDefined, plan)
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
@@ -261,8 +261,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
joined.sort("bucketed_table1.k", "bucketed_table2.k"),
df1.join(df2, joinCondition(df1, df2, joinColumns)).sort("df1.k", "df2.k"))
- assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoin])
- val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoin]
+ assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
+ val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index a15bd227a9..19749a9713 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -152,8 +152,8 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.sparkPlan
- assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
- assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.ProjectExec => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.FilterExec => p }.length === 1)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 089cef615f..5378336ff8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -688,7 +688,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
.load(path)
val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst {
- case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
scan.rdd.asInstanceOf[FileScanRDD]
}