diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4b04fec57d..80a9156ddc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -55,11 +55,7 @@ import org.apache.spark.sql.sources._ */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) - if (files.fileFormat.toString == "TestFileFormat" || - files.fileFormat.isInstanceOf[parquet.DefaultSource] || - files.fileFormat.toString == "ORC") && - files.sqlContext.conf.parquetFileScan => + case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -68,26 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + val partitionColumns = l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(filters.filter(_.references.subsetOf(partitionSet))) + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver) - val bucketColumns = - AttributeSet( - files.bucketSpec - .map(_.bucketColumnNames) - .getOrElse(Nil) - .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) - .getOrElse(sys.error("")))) - // Partition keys are not available in the statistics of the files. - val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) + val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters @@ -111,8 +109,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val readFile = files.fileFormat.buildReader( sqlContext = files.sqlContext, + dataSchema = files.dataSchema, partitionSchema = files.partitionSchema, - dataSchema = prunedDataSchema, + requiredSchema = prunedDataSchema, filters = pushedDownFilters, options = files.options) @@ -134,11 +133,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes") + val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => - assert(file.getLen != 0, file.toString) (0L to file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining @@ -153,7 +153,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { /** Add the given file to the current partition. */ def addFile(file: PartitionedFile): Unit = { - currentSize += file.length + currentSize += file.length + openCostInBytes currentFiles.append(file) } @@ -175,17 +175,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { splitFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { closePartition() - addFile(file) - } else { - addFile(file) } + addFile(file) } closePartition() partitions } val scan = - DataSourceScan( + DataSourceScan.create( readDataColumns ++ partitionColumns, new FileScanRDD( files.sqlContext, |