aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-08-03 11:19:55 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-03 11:19:55 -0700
commite6f226c5670d9f332b49ca40ff7b86b81a218d1b (patch)
tree4b0c3899e1026fcf2f48a55bbb08f5dd26c8a4a3 /sql/core/src/test
parentb55f34370f695de355b72c1518b5f2a45c324af0 (diff)
downloadspark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.tar.gz
spark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.tar.bz2
spark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.zip
[SPARK-16596] [SQL] Refactor DataSourceScanExec to do partition discovery at execution instead of planning time
## What changes were proposed in this pull request? Partition discovery is rather expensive, so we should do it at execution time instead of during physical planning. Right now there is not much benefit since ListingFileCatalog will read scan for all partitions at planning time anyways, but this can be optimized in the future. Also, there might be more information for partition pruning not available at planning time. This PR moves a lot of the file scan logic from planning to execution time. All file scan operations are handled by `FileSourceScanExec`, which handles both batched and non-batched file scans. This requires some duplication with `RowDataSourceScanExec`, but is probably worth it so that `FileSourceScanExec` does not need to depend on an input RDD. TODO: In another pr, move DataSourceScanExec to it's own file. ## How was this patch tested? Existing tests (it might be worth adding a test that catalog.listFiles() is delayed until execution, but this can be delayed until there is an actual benefit to doing so). Author: Eric Liang <ekl@databricks.com> Closes #14241 from ericl/refactor.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala4
3 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 18246500f7..09fd750180 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -518,8 +518,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
def getFileScanRDD(df: DataFrame): FileScanRDD = {
df.queryExecution.executedPlan.collect {
- case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
- scan.rdd.asInstanceOf[FileScanRDD]
+ case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
+ scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7e83bcbb6e..9dd8d9f804 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -25,7 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
-import org.apache.spark.sql.execution.BatchedDataSourceScanExec
+import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -624,16 +624,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
// donot return batch, because whole stage codegen is disabled for wide table (>200 columns)
val df2 = spark.read.parquet(path)
- assert(df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isEmpty,
- "Should not return batch")
+ val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+ assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
checkAnswer(df2, df)
// return batch
val columns = Seq.tabulate(9) {i => s"c$i"}
val df3 = df2.selectExpr(columns : _*)
- assert(
- df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined,
- "Should return batch")
+ val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+ assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch)
checkAnswer(df3, df.selectExpr(columns : _*))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 9d0a2b3d5b..19c89f5c41 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -198,8 +198,8 @@ class FileStreamSinkSuite extends StreamTest {
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
- case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
- scan.rdd.asInstanceOf[FileScanRDD]
+ case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
+ scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}