aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-26 16:10:35 -0700
committerYin Huai <yhuai@databricks.com>2016-03-26 16:10:35 -0700
commitb547de8a60074ca25c5bec3a24511f8042bdf0ad (patch)
tree4614553f2a85f50736709afe95b4f9d9bec3f3f9 /sql/core
parent8989d3a39657e817918fb4e5fdab172b68b85df6 (diff)
downloadspark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.gz
spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.bz2
spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.zip
[SPARK-14116][SQL] Implements buildReader() for ORC data source
## What changes were proposed in this pull request? This PR implements `FileFormat.buildReader()` for our ORC data source. It also fixed several minor styling issues related to `HadoopFsRelation` planning code path. Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating `OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply create the record reader with the help of `OrcNewInputFormat` and `FileSplit`. ## How was this patch tested? Existing test cases should do the work Author: Cheng Lian <lian@databricks.com> Closes #11936 from liancheng/spark-14116-build-reader-for-orc.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala1
5 files changed, 23 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 4943702438..52c8f3ef0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
(0 until spec.numBuckets).map { bucketId =>
- bucketedDataMap.get(bucketId).getOrElse {
- t.sqlContext.emptyResult: RDD[InternalRow]
- }
+ bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow])
})
bucketedRDD
}
@@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
result.setColumn(resultIdx, input.column(inputIdx))
inputIdx += 1
} else {
- require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 1)
+ require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
var partitionIdx = 0
partitionColumnSchema.fields.foreach { f => {
if (f.name.equals(attr.name)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index bbe7f4abb1..988c785dbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -32,7 +32,7 @@ case class PartitionedFile(
filePath: String,
start: Long,
length: Long) {
- override def toString(): String = {
+ override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
}
@@ -44,7 +44,7 @@ case class PartitionedFile(
*
* TODO: This currently does not take locality information about the files into account.
*/
-case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
+case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition
class FileScanRDD(
@transient val sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index de89d5f1fc..4b04fec57d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -56,9 +55,10 @@ import org.apache.spark.sql.types._
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
if (files.fileFormat.toString == "TestFileFormat" ||
- files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
+ files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
+ files.fileFormat.toString == "ORC") &&
files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
@@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val bucketColumns =
AttributeSet(
files.bucketSpec
- .map(_.bucketColumnNames)
- .getOrElse(Nil)
- .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
- .getOrElse(sys.error(""))))
+ .map(_.bucketColumnNames)
+ .getOrElse(Nil)
+ .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
+ .getOrElse(sys.error(""))))
// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -101,8 +101,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val readDataColumns =
dataColumns
- .filter(requiredAttributes.contains)
- .filterNot(partitionColumns.contains)
+ .filter(requiredAttributes.contains)
+ .filterNot(partitionColumns.contains)
val prunedDataSchema = readDataColumns.toStructType
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
@@ -120,13 +120,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
- selectedPartitions
- .flatMap { p =>
- p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
- }.groupBy { f =>
+ selectedPartitions.flatMap { p =>
+ p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
+ }.groupBy { f =>
BucketingUtils
- .getBucketId(new Path(f.filePath).getName)
- .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+ .getBucketId(new Path(f.filePath).getName)
+ .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}
(0 until bucketing.numBuckets).map { bucketId =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 2f2d438f32..d6b84be267 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -321,11 +321,11 @@ private[sql] class DefaultSource
// Try to push down filters when filter push-down is enabled.
val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
} else {
None
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 6af403dec5..5cfc9e9afa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.text
-import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}