From 37f3be5d29192db0a54f6c4699237b149bd0ecae Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 22 Jun 2016 18:19:07 -0700 Subject: [SPARK-16159][SQL] Move RDD creation logic from FileSourceStrategy.apply ## What changes were proposed in this pull request? We embed partitioning logic in FileSourceStrategy.apply, making the function very long. This is a small refactoring to move it into its own functions. Eventually we would be able to move the partitioning functions into a physical operator, rather than doing it in physical planning. ## How was this patch tested? This is a simple code move. Author: Reynold Xin Closes #13862 from rxin/SPARK-16159. --- .../sql/execution/datasources/FileScanRDD.scala | 26 ++- .../execution/datasources/FileSourceStrategy.scala | 240 ++++++++++++--------- 2 files changed, 154 insertions(+), 112 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index f7f68b1eb9..1443057d5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -27,9 +27,14 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.ColumnarBatch /** - * A single file that should be read, along with partition column values that - * need to be prepended to each row. The reading should start at the first - * valid record found after `start`. + * A part (i.e. "block") of a single file that should be read, along with partition column values + * that need to be prepended to each row. + * + * @param partitionValues value of partition columns to be prepended to each row. + * @param filePath path of the file to read + * @param start the beginning offset (in bytes) of the block. + * @param length number of bytes to read. + * @param locations locality information (list of nodes that have the data). */ case class PartitionedFile( partitionValues: InternalRow, @@ -43,13 +48,14 @@ case class PartitionedFile( } /** - * A collection of files that should be read as a single task possibly from multiple partitioned - * directories. - * - * TODO: This currently does not take locality information about the files into account. + * A collection of file blocks that should be read as a single task + * (possibly from multiple partitioned directories). */ case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition +/** + * An RDD that scans a list of file partitions. + */ class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], @@ -88,8 +94,8 @@ class FileScanRDD( private[this] var currentFile: PartitionedFile = null private[this] var currentIterator: Iterator[Object] = null - def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() - def next() = { + def hasNext: Boolean = (currentIterator != null && currentIterator.hasNext) || nextIterator() + def next(): Object = { val nextElement = currentIterator.next() // TODO: we should have a better separation of row based and batch based scan, so that we // don't need to run this `if` for every record. @@ -120,7 +126,7 @@ class FileScanRDD( } } - override def close() = { + override def close(): Unit = { updateBytesRead() updateBytesReadWithFileSize() InputFileNameHolder.unsetInputFileName() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 13a86bfb38..04f166f8ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -22,8 +22,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -57,7 +58,7 @@ import org.apache.spark.sql.execution.SparkPlan private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, - l @ LogicalRelation(files: HadoopFsRelation, _, table)) => + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -77,14 +78,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { } val partitionColumns = - l.resolve(files.partitionSchema, files.sparkSession.sessionState.analyzer.resolver) + l.resolve( + fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = - l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) + l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) @@ -93,7 +95,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val afterScanFilters = filterSet -- partitionKeyFilters logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}") - val selectedPartitions = files.location.listFiles(partitionKeyFilters.toSeq) + val selectedPartitions = fsRelation.location.listFiles(partitionKeyFilters.toSeq) val filterAttributes = AttributeSet(afterScanFilters) val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects @@ -109,113 +111,35 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - val readFile = files.fileFormat.buildReaderWithPartitionValues( - sparkSession = files.sparkSession, - dataSchema = files.dataSchema, - partitionSchema = files.partitionSchema, - requiredSchema = prunedDataSchema, - filters = pushedDownFilters, - options = files.options, - hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options)) - - val plannedPartitions = files.bucketSpec match { - case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled => - logInfo(s"Planning with ${bucketing.numBuckets} buckets") - val bucketed = - selectedPartitions.flatMap { p => - p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) - PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) - } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) - } - - (0 until bucketing.numBuckets).map { bucketId => - FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil)) - } - + val readFile: (PartitionedFile) => Iterator[InternalRow] = + fsRelation.fileFormat.buildReaderWithPartitionValues( + sparkSession = fsRelation.sparkSession, + dataSchema = fsRelation.dataSchema, + partitionSchema = fsRelation.partitionSchema, + requiredSchema = prunedDataSchema, + filters = pushedDownFilters, + options = fsRelation.options, + hadoopConf = + fsRelation.sparkSession.sessionState.newHadoopConfWithOptions(fsRelation.options)) + + val rdd = fsRelation.bucketSpec match { + case Some(bucketing) if fsRelation.sparkSession.sessionState.conf.bucketingEnabled => + createBucketedReadRDD(bucketing, readFile, selectedPartitions, fsRelation) case _ => - val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism - val maxSplitBytes = Math.min(defaultMaxSplitBytes, - Math.max(openCostInBytes, bytesPerCore)) - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => - val blockLocations = getBlockLocations(file) - if (files.fileFormat.isSplitable(files.sparkSession, files.options, file.getPath)) { - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset - val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile( - partition.values, file.getPath.toUri.toString, offset, size, hosts) - } - } else { - val hosts = getBlockHosts(blockLocations, 0, file.getLen) - Seq(PartitionedFile( - partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) - } - } - }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - - val partitions = new ArrayBuffer[FilePartition] - val currentFiles = new ArrayBuffer[PartitionedFile] - var currentSize = 0L - - /** Add the given file to the current partition. */ - def addFile(file: PartitionedFile): Unit = { - currentSize += file.length + openCostInBytes - currentFiles.append(file) - } - - /** Close the current partition and move to the next. */ - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - val newPartition = - FilePartition( - partitions.size, - currentFiles.toArray.toSeq) // Copy to a new Array. - partitions.append(newPartition) - } - currentFiles.clear() - currentSize = 0 - } - - // Assign files to partitions using "First Fit Decreasing" (FFD) - // TODO: consider adding a slop factor here? - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { - closePartition() - } - addFile(file) - } - closePartition() - partitions + createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation) } val meta = Map( - "Format" -> files.fileFormat.toString, + "Format" -> fsRelation.fileFormat.toString, "ReadSchema" -> prunedDataSchema.simpleString, PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), - INPUT_PATHS -> files.location.paths.mkString(", ")) + INPUT_PATHS -> fsRelation.location.paths.mkString(", ")) val scan = DataSourceScanExec.create( readDataColumns ++ partitionColumns, - new FileScanRDD( - files.sparkSession, - readFile, - plannedPartitions), - files, + rdd, + fsRelation, meta, table) @@ -232,6 +156,118 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => Nil } + /** + * Create an RDD for bucketed reads. + * The non-bucketed variant of this function is [[createNonBucketedReadRDD]]. + * + * The algorithm is pretty simple: each RDD partition being returned should include all the files + * with the same bucket id from all the given Hive partitions. + * + * @param bucketSpec the bucketing spec. + * @param readFile a function to read each (part of a) file. + * @param selectedPartitions Hive-style partition that are part of the read. + * @param fsRelation [[HadoopFsRelation]] associated with the read. + */ + private def createBucketedReadRDD( + bucketSpec: BucketSpec, + readFile: (PartitionedFile) => Iterator[InternalRow], + selectedPartitions: Seq[Partition], + fsRelation: HadoopFsRelation): RDD[InternalRow] = { + logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + val bucketed = + selectedPartitions.flatMap { p => + p.files.map { f => + val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) + PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) + } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } + + val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil)) + } + + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } + + /** + * Create an RDD for non-bucketed reads. + * The bucketed variant of this function is [[createBucketedReadRDD]]. + * + * @param readFile a function to read each (part of a) file. + * @param selectedPartitions Hive-style partition that are part of the read. + * @param fsRelation [[HadoopFsRelation]] associated with the read. + */ + private def createNonBucketedReadRDD( + readFile: (PartitionedFile) => Iterator[InternalRow], + selectedPartitions: Seq[Partition], + fsRelation: HadoopFsRelation): RDD[InternalRow] = { + val defaultMaxSplitBytes = + fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val splitFiles = selectedPartitions.flatMap { partition => + partition.files.flatMap { file => + val blockLocations = getBlockLocations(file) + if (fsRelation.fileFormat.isSplitable( + fsRelation.sparkSession, fsRelation.options, file.getPath)) { + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( + partition.values, file.getPath.toUri.toString, offset, size, hosts) + } + } else { + val hosts = getBlockHosts(blockLocations, 0, file.getLen) + Seq(PartitionedFile( + partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) + } + } + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) // Copy to a new Array. + partitions.append(newPartition) + } + currentFiles.clear() + currentSize = 0 + } + + // Assign files to partitions using "First Fit Decreasing" (FFD) + // TODO: consider adding a slop factor here? + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles.append(file) + } + closePartition() + + new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + } + private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { case f: LocatedFileStatus => f.getBlockLocations case f => Array.empty[BlockLocation] -- cgit v1.2.3