diff options
author | Michael Armbrust <michael@databricks.com> | 2016-03-14 19:21:12 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-03-14 19:21:12 -0700 |
commit | 17eec0a71ba8713c559d641e3f43a1be726b037c (patch) | |
tree | 6f2a6c5a7aef585ef58bb2d6fba4f63bc58f167a /mllib | |
parent | 992142b87ed5b507493e4f9fac3f72ba14fafbbc (diff) | |
download | spark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.gz spark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.bz2 spark-17eec0a71ba8713c559d641e3f43a1be726b037c.zip |
[SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files
This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed.
Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties:
- It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns in the public API of `org.apache.spark.sql.sources.FileFormat`
- Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
- It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
- it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning.
- Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.
Currently only a testing source is planned / tested using this strategy. In follow-up PRs we will port the existing formats to this API.
A stub for `FileScanRDD` is also added, but most methods remain unimplemented.
Other minor cleanups:
- partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic. This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
- The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
- `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls
- Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.
Author: Michael Armbrust <michael@databricks.com>
Closes #11646 from marmbrus/fileStrategy.
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 976343ed96..13a13f0a7e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -150,7 +150,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { requiredColumns: Array[String], filters: Array[Filter], bucketSet: Option[BitSet], - inputFiles: Array[FileStatus], + inputFiles: Seq[FileStatus], broadcastedConf: Broadcast[SerializableConfiguration], options: Map[String, String]): RDD[InternalRow] = { // TODO: This does not handle cases where column pruning has been performed. |