diff options
author | Cheng Lian <lian@databricks.com> | 2016-03-26 16:10:35 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-26 16:10:35 -0700 |
commit | b547de8a60074ca25c5bec3a24511f8042bdf0ad (patch) | |
tree | 4614553f2a85f50736709afe95b4f9d9bec3f3f9 /sql/core | |
parent | 8989d3a39657e817918fb4e5fdab172b68b85df6 (diff) | |
download | spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.gz spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.bz2 spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.zip |
[SPARK-14116][SQL] Implements buildReader() for ORC data source
## What changes were proposed in this pull request?
This PR implements `FileFormat.buildReader()` for our ORC data source. It also fixed several minor styling issues related to `HadoopFsRelation` planning code path.
Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating `OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply create the record reader with the help of `OrcNewInputFormat` and `FileSplit`.
## How was this patch tested?
Existing test cases should do the work
Author: Cheng Lian <lian@databricks.com>
Closes #11936 from liancheng/spark-14116-build-reader-for-orc.
Diffstat (limited to 'sql/core')
5 files changed, 23 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4943702438..52c8f3ef0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext, (0 until spec.numBuckets).map { bucketId => - bucketedDataMap.get(bucketId).getOrElse { - t.sqlContext.emptyResult: RDD[InternalRow] - } + bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow]) }) bucketedRDD } @@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { result.setColumn(resultIdx, input.column(inputIdx)) inputIdx += 1 } else { - require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 1) + require(partitionColumnSchema.fields.count(_.name == attr.name) == 1) var partitionIdx = 0 partitionColumnSchema.fields.foreach { f => { if (f.name.equals(attr.name)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index bbe7f4abb1..988c785dbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -32,7 +32,7 @@ case class PartitionedFile( filePath: String, start: Long, length: Long) { - override def toString(): String = { + override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" } } @@ -44,7 +44,7 @@ case class PartitionedFile( * * TODO: This currently does not take locality information about the files into account. */ -case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition +case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition class FileScanRDD( @transient val sqlContext: SQLContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index de89d5f1fc..4b04fec57d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -56,9 +55,10 @@ import org.apache.spark.sql.types._ */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _)) + case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) if (files.fileFormat.toString == "TestFileFormat" || - files.fileFormat.isInstanceOf[parquet.DefaultSource]) && + files.fileFormat.isInstanceOf[parquet.DefaultSource] || + files.fileFormat.toString == "ORC") && files.sqlContext.conf.parquetFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: @@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val bucketColumns = AttributeSet( files.bucketSpec - .map(_.bucketColumnNames) - .getOrElse(Nil) - .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) - .getOrElse(sys.error("")))) + .map(_.bucketColumnNames) + .getOrElse(Nil) + .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) + .getOrElse(sys.error("")))) // Partition keys are not available in the statistics of the files. val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) @@ -101,8 +101,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val readDataColumns = dataColumns - .filter(requiredAttributes.contains) - .filterNot(partitionColumns.contains) + .filter(requiredAttributes.contains) + .filterNot(partitionColumns.contains) val prunedDataSchema = readDataColumns.toStructType logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}") @@ -120,13 +120,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case Some(bucketing) if files.sqlContext.conf.bucketingEnabled => logInfo(s"Planning with ${bucketing.numBuckets} buckets") val bucketed = - selectedPartitions - .flatMap { p => - p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen)) - }.groupBy { f => + selectedPartitions.flatMap { p => + p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen)) + }.groupBy { f => BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } (0 until bucketing.numBuckets).map { bucketId => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 2f2d438f32..d6b84be267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -321,11 +321,11 @@ private[sql] class DefaultSource // Try to push down filters when filter push-down is enabled. val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 6af403dec5..5cfc9e9afa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.text -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} |