diff options
Diffstat (limited to 'sql/core/src/test')
3 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 18246500f7..09fd750180 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -518,8 +518,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi def getFileScanRDD(df: DataFrame): FileScanRDD = { df.queryExecution.executedPlan.collect { - case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => - scan.rdd.asInstanceOf[FileScanRDD] + case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => + scan.inputRDDs().head.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 7e83bcbb6e..9dd8d9f804 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -25,7 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow -import org.apache.spark.sql.execution.BatchedDataSourceScanExec +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -624,16 +624,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) - assert(df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isEmpty, - "Should not return batch") + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) checkAnswer(df2, df) // return batch val columns = Seq.tabulate(9) {i => s"c$i"} val df3 = df2.selectExpr(columns : _*) - assert( - df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined, - "Should return batch") + val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch) checkAnswer(df3, df.selectExpr(columns : _*)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 9d0a2b3d5b..19c89f5c41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -198,8 +198,8 @@ class FileStreamSinkSuite extends StreamTest { /** Check some condition on the partitions of the FileScanRDD generated by a DF */ def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { val getFileScanRDD = df.queryExecution.executedPlan.collect { - case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => - scan.rdd.asInstanceOf[FileScanRDD] + case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => + scan.inputRDDs().head.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") } |