aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-06-22 18:19:07 -0700
committerReynold Xin <rxin@databricks.com>2016-06-22 18:19:07 -0700
commit37f3be5d29192db0a54f6c4699237b149bd0ecae (patch)
tree58dc6d4922e3fd2730eeab6334c4916e3189e8e3 /sql
parent9f990fa3f9e0b798d8018cf4132b93a3468f33bb (diff)
downloadspark-37f3be5d29192db0a54f6c4699237b149bd0ecae.tar.gz
spark-37f3be5d29192db0a54f6c4699237b149bd0ecae.tar.bz2
spark-37f3be5d29192db0a54f6c4699237b149bd0ecae.zip
[SPARK-16159][SQL] Move RDD creation logic from FileSourceStrategy.apply
## What changes were proposed in this pull request? We embed partitioning logic in FileSourceStrategy.apply, making the function very long. This is a small refactoring to move it into its own functions. Eventually we would be able to move the partitioning functions into a physical operator, rather than doing it in physical planning. ## How was this patch tested? This is a simple code move. Author: Reynold Xin <rxin@databricks.com> Closes #13862 from rxin/SPARK-16159.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala240
2 files changed, 154 insertions, 112 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f7f68b1eb9..1443057d5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -27,9 +27,14 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
/**
- * A single file that should be read, along with partition column values that
- * need to be prepended to each row. The reading should start at the first
- * valid record found after `start`.
+ * A part (i.e. "block") of a single file that should be read, along with partition column values
+ * that need to be prepended to each row.
+ *
+ * @param partitionValues value of partition columns to be prepended to each row.
+ * @param filePath path of the file to read
+ * @param start the beginning offset (in bytes) of the block.
+ * @param length number of bytes to read.
+ * @param locations locality information (list of nodes that have the data).
*/
case class PartitionedFile(
partitionValues: InternalRow,
@@ -43,13 +48,14 @@ case class PartitionedFile(
}
/**
- * A collection of files that should be read as a single task possibly from multiple partitioned
- * directories.
- *
- * TODO: This currently does not take locality information about the files into account.
+ * A collection of file blocks that should be read as a single task
+ * (possibly from multiple partitioned directories).
*/
case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition
+/**
+ * An RDD that scans a list of file partitions.
+ */
class FileScanRDD(
@transient private val sparkSession: SparkSession,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@@ -88,8 +94,8 @@ class FileScanRDD(
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
- def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
- def next() = {
+ def hasNext: Boolean = (currentIterator != null && currentIterator.hasNext) || nextIterator()
+ def next(): Object = {
val nextElement = currentIterator.next()
// TODO: we should have a better separation of row based and batch based scan, so that we
// don't need to run this `if` for every record.
@@ -120,7 +126,7 @@ class FileScanRDD(
}
}
- override def close() = {
+ override def close(): Unit = {
updateBytesRead()
updateBytesReadWithFileSize()
InputFileNameHolder.unsetInputFileName()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bfb38..04f166f8ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -22,8 +22,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -57,7 +58,7 @@ import org.apache.spark.sql.execution.SparkPlan
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters,
- l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
+ l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
@@ -77,14 +78,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
val partitionColumns =
- l.resolve(files.partitionSchema, files.sparkSession.sessionState.analyzer.resolver)
+ l.resolve(
+ fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
- l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
+ l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -93,7 +95,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val afterScanFilters = filterSet -- partitionKeyFilters
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
- val selectedPartitions = files.location.listFiles(partitionKeyFilters.toSeq)
+ val selectedPartitions = fsRelation.location.listFiles(partitionKeyFilters.toSeq)
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
@@ -109,113 +111,35 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
- val readFile = files.fileFormat.buildReaderWithPartitionValues(
- sparkSession = files.sparkSession,
- dataSchema = files.dataSchema,
- partitionSchema = files.partitionSchema,
- requiredSchema = prunedDataSchema,
- filters = pushedDownFilters,
- options = files.options,
- hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
- val plannedPartitions = files.bucketSpec match {
- case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
- logInfo(s"Planning with ${bucketing.numBuckets} buckets")
- val bucketed =
- selectedPartitions.flatMap { p =>
- p.files.map { f =>
- val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
- PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
- }
- }.groupBy { f =>
- BucketingUtils
- .getBucketId(new Path(f.filePath).getName)
- .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
- }
-
- (0 until bucketing.numBuckets).map { bucketId =>
- FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
- }
-
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ fsRelation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = fsRelation.sparkSession,
+ dataSchema = fsRelation.dataSchema,
+ partitionSchema = fsRelation.partitionSchema,
+ requiredSchema = prunedDataSchema,
+ filters = pushedDownFilters,
+ options = fsRelation.options,
+ hadoopConf =
+ fsRelation.sparkSession.sessionState.newHadoopConfWithOptions(fsRelation.options))
+
+ val rdd = fsRelation.bucketSpec match {
+ case Some(bucketing) if fsRelation.sparkSession.sessionState.conf.bucketingEnabled =>
+ createBucketedReadRDD(bucketing, readFile, selectedPartitions, fsRelation)
case _ =>
- val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes
- val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes
- val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism
- val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
- val bytesPerCore = totalBytes / defaultParallelism
- val maxSplitBytes = Math.min(defaultMaxSplitBytes,
- Math.max(openCostInBytes, bytesPerCore))
- logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
- s"open cost is considered as scanning $openCostInBytes bytes.")
-
- val splitFiles = selectedPartitions.flatMap { partition =>
- partition.files.flatMap { file =>
- val blockLocations = getBlockLocations(file)
- if (files.fileFormat.isSplitable(files.sparkSession, files.options, file.getPath)) {
- (0L until file.getLen by maxSplitBytes).map { offset =>
- val remaining = file.getLen - offset
- val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
- val hosts = getBlockHosts(blockLocations, offset, size)
- PartitionedFile(
- partition.values, file.getPath.toUri.toString, offset, size, hosts)
- }
- } else {
- val hosts = getBlockHosts(blockLocations, 0, file.getLen)
- Seq(PartitionedFile(
- partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
- }
- }
- }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
- val partitions = new ArrayBuffer[FilePartition]
- val currentFiles = new ArrayBuffer[PartitionedFile]
- var currentSize = 0L
-
- /** Add the given file to the current partition. */
- def addFile(file: PartitionedFile): Unit = {
- currentSize += file.length + openCostInBytes
- currentFiles.append(file)
- }
-
- /** Close the current partition and move to the next. */
- def closePartition(): Unit = {
- if (currentFiles.nonEmpty) {
- val newPartition =
- FilePartition(
- partitions.size,
- currentFiles.toArray.toSeq) // Copy to a new Array.
- partitions.append(newPartition)
- }
- currentFiles.clear()
- currentSize = 0
- }
-
- // Assign files to partitions using "First Fit Decreasing" (FFD)
- // TODO: consider adding a slop factor here?
- splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
- closePartition()
- }
- addFile(file)
- }
- closePartition()
- partitions
+ createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation)
}
val meta = Map(
- "Format" -> files.fileFormat.toString,
+ "Format" -> fsRelation.fileFormat.toString,
"ReadSchema" -> prunedDataSchema.simpleString,
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
- INPUT_PATHS -> files.location.paths.mkString(", "))
+ INPUT_PATHS -> fsRelation.location.paths.mkString(", "))
val scan =
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
- new FileScanRDD(
- files.sparkSession,
- readFile,
- plannedPartitions),
- files,
+ rdd,
+ fsRelation,
meta,
table)
@@ -232,6 +156,118 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ => Nil
}
+ /**
+ * Create an RDD for bucketed reads.
+ * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec the bucketing spec.
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createBucketedReadRDD(
+ bucketSpec: BucketSpec,
+ readFile: (PartitionedFile) => Iterator[InternalRow],
+ selectedPartitions: Seq[Partition],
+ fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+ logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+ val bucketed =
+ selectedPartitions.flatMap { p =>
+ p.files.map { f =>
+ val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
+ PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
+ }
+ }.groupBy { f =>
+ BucketingUtils
+ .getBucketId(new Path(f.filePath).getName)
+ .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+ }
+
+ val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
+ }
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads.
+ * The bucketed variant of this function is [[createBucketedReadRDD]].
+ *
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createNonBucketedReadRDD(
+ readFile: (PartitionedFile) => Iterator[InternalRow],
+ selectedPartitions: Seq[Partition],
+ fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+ val defaultMaxSplitBytes =
+ fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
+ val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ val blockLocations = getBlockLocations(file)
+ if (fsRelation.fileFormat.isSplitable(
+ fsRelation.sparkSession, fsRelation.options, file.getPath)) {
+ (0L until file.getLen by maxSplitBytes).map { offset =>
+ val remaining = file.getLen - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+ val hosts = getBlockHosts(blockLocations, offset, size)
+ PartitionedFile(
+ partition.values, file.getPath.toUri.toString, offset, size, hosts)
+ }
+ } else {
+ val hosts = getBlockHosts(blockLocations, 0, file.getLen)
+ Seq(PartitionedFile(
+ partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
+ }
+ }
+ }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions = new ArrayBuffer[FilePartition]
+ val currentFiles = new ArrayBuffer[PartitionedFile]
+ var currentSize = 0L
+
+ /** Close the current partition and move to the next. */
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ val newPartition =
+ FilePartition(
+ partitions.size,
+ currentFiles.toArray.toSeq) // Copy to a new Array.
+ partitions.append(newPartition)
+ }
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ // Assign files to partitions using "First Fit Decreasing" (FFD)
+ // TODO: consider adding a slop factor here?
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
+ }
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles.append(file)
+ }
+ closePartition()
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+ }
+
private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
case f: LocatedFileStatus => f.getBlockLocations
case f => Array.empty[BlockLocation]