diff options
author | Eric Liang <ekl@databricks.com> | 2016-08-03 11:19:55 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-08-03 11:19:55 -0700 |
commit | e6f226c5670d9f332b49ca40ff7b86b81a218d1b (patch) | |
tree | 4b0c3899e1026fcf2f48a55bbb08f5dd26c8a4a3 /sql/hive | |
parent | b55f34370f695de355b72c1518b5f2a45c324af0 (diff) | |
download | spark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.tar.gz spark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.tar.bz2 spark-e6f226c5670d9f332b49ca40ff7b86b81a218d1b.zip |
[SPARK-16596] [SQL] Refactor DataSourceScanExec to do partition discovery at execution instead of planning time
## What changes were proposed in this pull request?
Partition discovery is rather expensive, so we should do it at execution time instead of during physical planning. Right now there is not much benefit since ListingFileCatalog will read scan for all partitions at planning time anyways, but this can be optimized in the future. Also, there might be more information for partition pruning not available at planning time.
This PR moves a lot of the file scan logic from planning to execution time. All file scan operations are handled by `FileSourceScanExec`, which handles both batched and non-batched file scans. This requires some duplication with `RowDataSourceScanExec`, but is probably worth it so that `FileSourceScanExec` does not need to depend on an input RDD.
TODO: In another pr, move DataSourceScanExec to it's own file.
## How was this patch tested?
Existing tests (it might be worth adding a test that catalog.listFiles() is delayed until execution, but this can be delayed until there is an actual benefit to doing so).
Author: Eric Liang <ekl@databricks.com>
Closes #14241 from ericl/refactor.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 4 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 4 |
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 8d161a3c46..ca2ec9f6a5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -358,11 +358,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet df1.write.parquet(tableDir.getAbsolutePath) val agged = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[RuntimeException] { + val error = intercept[Exception] { agged.count() } - assert(error.toString contains "Invalid bucket file") + assert(error.getCause().toString contains "Invalid bucket file") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 047b08c4cc..27bb9676e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -862,8 +862,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .load(path) val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst { - case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => - scan.rdd.asInstanceOf[FileScanRDD] + case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => + scan.inputRDDs().head.asInstanceOf[FileScanRDD] } val partitions = fileScanRDD.partitions |