diff options
author | Michael Armbrust <michael@databricks.com> | 2016-03-07 15:15:10 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-07 15:15:10 -0800 |
commit | e720dda42e806229ccfd970055c7b8a93eb447bf (patch) | |
tree | 641c3d454b638a347adc4c51db8cc69c41b44ac2 /project/MimaExcludes.scala | |
parent | 0eea12a3d956b54bbbd73d21b296868852a04494 (diff) | |
download | spark-e720dda42e806229ccfd970055c7b8a93eb447bf.tar.gz spark-e720dda42e806229ccfd970055c7b8a93eb447bf.tar.bz2 spark-e720dda42e806229ccfd970055c7b8a93eb447bf.zip |
[SPARK-13665][SQL] Separate the concerns of HadoopFsRelation
`HadoopFsRelation` is used for reading most files into Spark SQL. However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data. As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency. This PR is a first cut at separating this into several components / interfaces that are each described below. Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`. External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.
### HadoopFsRelation
A simple `case class` that acts as a container for all of the metadata required to read from a datasource. All discovery, resolution and merging logic for schemas and partitions has been removed. This an internal representation that no longer needs to be exposed to developers.
```scala
case class HadoopFsRelation(
sqlContext: SQLContext,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String]) extends BaseRelation
```
### FileFormat
The primary interface that will be implemented by each different format including external libraries. Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`. A format can optionally return a schema that is inferred from a set of files.
```scala
trait FileFormat {
def inferSchema(
sqlContext: SQLContext,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
def prepareWrite(
sqlContext: SQLContext,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
def buildInternalScan(
sqlContext: SQLContext,
dataSchema: StructType,
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow]
}
```
The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner). Additionally, scans are still returning `RDD`s instead of iterators for single files. In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.
### FileCatalog
This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.
```scala
trait FileCatalog {
def paths: Seq[Path]
def partitionSpec(schema: Option[StructType]): PartitionSpec
def allFiles(): Seq[FileStatus]
def getStatus(path: Path): Array[FileStatus]
def refresh(): Unit
}
```
Currently there are two implementations:
- `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`. Infers partitioning by recursive listing and caches this data for performance
- `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.
### ResolvedDataSource
Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
- `paths: Seq[String] = Nil`
- `userSpecifiedSchema: Option[StructType] = None`
- `partitionColumns: Array[String] = Array.empty`
- `bucketSpec: Option[BucketSpec] = None`
- `provider: String`
- `options: Map[String, String]`
This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones). All reconciliation of partitions, buckets, schema from metastores or inference is done here.
### DataSourceAnalysis / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
- pruning the files from partitions that will be read based on filters.
- appending partition columns*
- applying additional filters when a data source can not evaluate them internally.
- constructing an RDD that is bucketed correctly when required*
- sanity checking schema match-up and other analysis when writing.
*In the future we should do that following:
- Break out file handling into its own Strategy as its sufficiently complex / isolated.
- Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
- Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes #11509 from marmbrus/fileDataSource.
Diffstat (limited to 'project/MimaExcludes.scala')
-rw-r--r-- | project/MimaExcludes.scala | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 983f71684c..45776fbb9f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -60,7 +60,11 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonRDD"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.dialectClassName"), - ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.getSQLDialect") + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.getSQLDialect"), + // SPARK-13664 Replace HadoopFsRelation with FileFormat + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.LibSVMRelation"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelationProvider"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache") ) ++ Seq( ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory") |