aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala44
1 files changed, 21 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4b04fec57d..80a9156ddc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -55,11 +55,7 @@ import org.apache.spark.sql.sources._
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
- if (files.fileFormat.toString == "TestFileFormat" ||
- files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
- files.fileFormat.toString == "ORC") &&
- files.sqlContext.conf.parquetFileScan =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
@@ -68,26 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
+ // The attribute name of predicate could be different than the one in schema in case of
+ // case insensitive, we should change them to match the one in schema, so we donot need to
+ // worry about case sensitivity anymore.
+ val normalizedFilters = filters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(l.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
val partitionColumns =
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
- ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
- val bucketColumns =
- AttributeSet(
- files.bucketSpec
- .map(_.bucketColumnNames)
- .getOrElse(Nil)
- .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
- .getOrElse(sys.error(""))))
-
// Partition keys are not available in the statistics of the files.
- val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
+ val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
@@ -111,8 +109,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val readFile = files.fileFormat.buildReader(
sqlContext = files.sqlContext,
+ dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
- dataSchema = prunedDataSchema,
+ requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options)
@@ -134,11 +133,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
- logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes")
+ val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
- assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
@@ -153,7 +153,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
- currentSize += file.length
+ currentSize += file.length + openCostInBytes
currentFiles.append(file)
}
@@ -175,17 +175,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
- addFile(file)
- } else {
- addFile(file)
}
+ addFile(file)
}
closePartition()
partitions
}
val scan =
- DataSourceScan(
+ DataSourceScan.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,