From ac2a26d09e10c3f462ec773c3ebaa6eedae81ac0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 4 Aug 2016 11:22:55 -0700 Subject: [SPARK-16884] Move DataSourceScanExec out of ExistingRDD.scala file ## What changes were proposed in this pull request? This moves DataSourceScanExec out so it's more discoverable, and now that it doesn't necessarily depend on an existing RDD. cc davies ## How was this patch tested? Existing tests. Author: Eric Liang Closes #14487 from ericl/split-scan. --- .../spark/sql/execution/DataSourceScanExec.scala | 521 +++++++++++++++++++++ .../apache/spark/sql/execution/ExistingRDD.scala | 505 +------------------- .../execution/datasources/DataSourceStrategy.scala | 3 +- 3 files changed, 525 insertions(+), 504 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala new file mode 100644 index 0000000000..1e749b3dfc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.Utils + +private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport { + val relation: BaseRelation + val metastoreTableIdentifier: Option[TableIdentifier] + + override val nodeName: String = { + s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" + } +} + +/** Physical plan node for scanning data from a relation. */ +private[sql] case class RowDataSourceScanExec( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) + extends DataSourceScanExec { + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + val outputUnsafeRows = relation match { + case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => + !SparkSession.getActiveSession.get.sessionState.conf.getConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + case _: HadoopFsRelation => true + case _ => false + } + + protected override def doExecute(): RDD[InternalRow] = { + val unsafeRow = if (outputUnsafeRows) { + rdd + } else { + rdd.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(schema) + iter.map(proj) + } + } + + val numOutputRows = longMetric("numOutputRows") + unsafeRow.map { r => + numOutputRows += 1 + r + } + } + + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + + s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + + override protected def doProduce(ctx: CodegenContext): String = { + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + new BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.genCode(ctx)) + val inputRow = if (outputUnsafeRows) row else null + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } + + // Ignore rdd when checking results + override def sameResult(plan: SparkPlan): Boolean = plan match { + case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata + case _ => false + } +} + +/** + * Physical plan node for scanning data from HadoopFsRelations. + * + * @param relation The file-based relation to scan. + * @param output Output attributes of the scan. + * @param outputSchema Output schema of the scan. + * @param partitionFilters Predicates to use for partition pruning. + * @param dataFilters Data source filters to use for filtering data within partitions. + * @param metastoreTableIdentifier + */ +private[sql] case class FileSourceScanExec( + @transient relation: HadoopFsRelation, + output: Seq[Attribute], + outputSchema: StructType, + partitionFilters: Seq[Expression], + dataFilters: Seq[Filter], + override val metastoreTableIdentifier: Option[TableIdentifier]) + extends DataSourceScanExec { + + val supportsBatch = relation.fileFormat.supportBatch( + relation.sparkSession, StructType.fromAttributes(output)) + + val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) { + SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled + } else { + false + } + + override val outputPartitioning: Partitioning = { + val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { + relation.bucketSpec + } else { + None + } + bucketSpec.map { spec => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.flatMap { n => + output.find(_.name == n) + } + if (bucketColumns.size == spec.bucketColumnNames.size) { + HashPartitioning(bucketColumns, numBuckets) + } else { + UnknownPartitioning(0) + } + }.getOrElse { + UnknownPartitioning(0) + } + } + + // These metadata values make scan plans uniquely identifiable for equality checking. + override val metadata: Map[String, String] = Map( + "Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), + "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), + "InputPaths" -> relation.location.paths.mkString(", ")) + + private lazy val inputRDD: RDD[InternalRow] = { + val selectedPartitions = relation.location.listFiles(partitionFilters) + + val readFile: (PartitionedFile) => Iterator[InternalRow] = + relation.fileFormat.buildReaderWithPartitionValues( + sparkSession = relation.sparkSession, + dataSchema = relation.dataSchema, + partitionSchema = relation.partitionSchema, + requiredSchema = outputSchema, + filters = dataFilters, + options = relation.options, + hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) + + relation.bucketSpec match { + case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => + createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) + case _ => + createNonBucketedReadRDD(readFile, selectedPartitions, relation) + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + inputRDD :: Nil + } + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + protected override def doExecute(): RDD[InternalRow] = { + if (supportsBatch) { + // in the case of fallback, this batched scan should never fail because of: + // 1) only primitive types are supported + // 2) the number of columns should be smaller than spark.sql.codegen.maxFields + WholeStageCodegenExec(this).execute() + } else { + val unsafeRows = { + val scan = inputRDD + if (needsUnsafeRowConversion) { + scan.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(schema) + iter.map(proj) + } + } else { + scan + } + } + val numOutputRows = longMetric("numOutputRows") + unsafeRows.map { r => + numOutputRows += 1 + r + } + } + } + + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" + } + + override protected def doProduce(ctx: CodegenContext): String = { + if (supportsBatch) { + return doProduceVectorized(ctx) + } + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + new BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.genCode(ctx)) + val inputRow = if (needsUnsafeRowConversion) null else row + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } + + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen + // never requires UnsafeRow as input. + private def doProduceVectorized(ctx: CodegenContext): String = { + val input = ctx.freshName("input") + // PhysicalRDD always just has one input + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val batch = ctx.freshName("batch") + ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + val idx = ctx.freshName("batchIdx") + ctx.addMutableState("int", idx, s"$idx = 0;") + val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) + val columnAssigns = colVars.zipWithIndex.map { case (name, i) => + ctx.addMutableState(columnVectorClz, name, s"$name = null;") + s"$name = $batch.column($i);" + } + + val nextBatch = ctx.freshName("nextBatch") + ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) + + ctx.currentVars = null + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + } + s""" + |if ($batch == null) { + | $nextBatch(); + |} + |while ($batch != null) { + | int numRows = $batch.numRows(); + | while ($idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | if (shouldStop()) return; + | } + | $batch = null; + | $nextBatch(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin + } + + private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, + dataType: DataType, nullable: Boolean): ExprCode = { + val javaType = ctx.javaType(dataType) + val value = ctx.getValue(columnVar, dataType, ordinal) + val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" } + val valueVar = ctx.freshName("value") + val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val code = s"${ctx.registerComment(str)}\n" + (if (nullable) { + s""" + boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal); + $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value); + """ + } else { + s"$javaType ${valueVar} = $value;" + }).trim + ExprCode(code, isNullVar, valueVar) + } + + /** + * Create an RDD for bucketed reads. + * The non-bucketed variant of this function is [[createNonBucketedReadRDD]]. + * + * The algorithm is pretty simple: each RDD partition being returned should include all the files + * with the same bucket id from all the given Hive partitions. + * + * @param bucketSpec the bucketing spec. + * @param readFile a function to read each (part of a) file. + * @param selectedPartitions Hive-style partition that are part of the read. + * @param fsRelation [[HadoopFsRelation]] associated with the read. + */ + private def createBucketedReadRDD( + bucketSpec: BucketSpec, + readFile: (PartitionedFile) => Iterator[InternalRow], + selectedPartitions: Seq[Partition], + fsRelation: HadoopFsRelation): RDD[InternalRow] = { + logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + val bucketed = + selectedPartitions.flatMap { p => + p.files.map { f => + val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) + PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) + } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } + + val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil)) + } + + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } + + /** + * Create an RDD for non-bucketed reads. + * The bucketed variant of this function is [[createBucketedReadRDD]]. + * + * @param readFile a function to read each (part of a) file. + * @param selectedPartitions Hive-style partition that are part of the read. + * @param fsRelation [[HadoopFsRelation]] associated with the read. + */ + private def createNonBucketedReadRDD( + readFile: (PartitionedFile) => Iterator[InternalRow], + selectedPartitions: Seq[Partition], + fsRelation: HadoopFsRelation): RDD[InternalRow] = { + val defaultMaxSplitBytes = + fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val splitFiles = selectedPartitions.flatMap { partition => + partition.files.flatMap { file => + val blockLocations = getBlockLocations(file) + if (fsRelation.fileFormat.isSplitable( + fsRelation.sparkSession, fsRelation.options, file.getPath)) { + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( + partition.values, file.getPath.toUri.toString, offset, size, hosts) + } + } else { + val hosts = getBlockHosts(blockLocations, 0, file.getLen) + Seq(PartitionedFile( + partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) + } + } + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) // Copy to a new Array. + partitions.append(newPartition) + } + currentFiles.clear() + currentSize = 0 + } + + // Assign files to partitions using "First Fit Decreasing" (FFD) + // TODO: consider adding a slop factor here? + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles.append(file) + } + closePartition() + + new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + } + + private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { + case f: LocatedFileStatus => f.getBlockLocations + case f => Array.empty[BlockLocation] + } + + // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)` + // pair that represents a segment of the same file, find out the block that contains the largest + // fraction the segment, and returns location hosts of that block. If no such block can be found, + // returns an empty array. + private def getBlockHosts( + blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = { + val candidates = blockLocations.map { + // The fragment starts from a position within this block + case b if b.getOffset <= offset && offset < b.getOffset + b.getLength => + b.getHosts -> (b.getOffset + b.getLength - offset).min(length) + + // The fragment ends at a position within this block + case b if offset <= b.getOffset && offset + length < b.getLength => + b.getHosts -> (offset + length - b.getOffset).min(length) + + // The fragment fully contains this block + case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length => + b.getHosts -> b.getLength + + // The fragment doesn't intersect with this block + case b => + b.getHosts -> 0L + }.filter { case (hosts, size) => + size > 0L + } + + if (candidates.isEmpty) { + Array.empty[String] + } else { + val (hosts, _) = candidates.maxBy { case (_, size) => size } + hosts + } + } + + override def sameResult(plan: SparkPlan): Boolean = plan match { + case other: FileSourceScanExec => + val thisPredicates = partitionFilters.map(cleanExpression) + val otherPredicates = other.partitionFilters.map(cleanExpression) + val result = relation == other.relation && metadata == other.metadata && + thisPredicates.length == otherPredicates.length && + thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2)) + result + case _ => false + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 79d9114ff3..b762c16914 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,26 +17,15 @@ package org.apache.spark.sql.execution -import scala.collection.mutable.ArrayBuffer - -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} - import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Encoder, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.{Encoder, Row, SparkSession} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils object RDDConversions { @@ -189,491 +178,3 @@ private[sql] case class RDDScanExec( s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } - -private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport { - val relation: BaseRelation - val metastoreTableIdentifier: Option[TableIdentifier] - - override val nodeName: String = { - s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" - } -} - -/** Physical plan node for scanning data from a relation. */ -private[sql] case class RowDataSourceScanExec( - output: Seq[Attribute], - rdd: RDD[InternalRow], - @transient relation: BaseRelation, - override val outputPartitioning: Partitioning, - override val metadata: Map[String, String], - override val metastoreTableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec { - - private[sql] override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - - val outputUnsafeRows = relation match { - case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => - !SparkSession.getActiveSession.get.sessionState.conf.getConf( - SQLConf.PARQUET_VECTORIZED_READER_ENABLED) - case _: HadoopFsRelation => true - case _ => false - } - - protected override def doExecute(): RDD[InternalRow] = { - val unsafeRow = if (outputUnsafeRows) { - rdd - } else { - rdd.mapPartitionsInternal { iter => - val proj = UnsafeProjection.create(schema) - iter.map(proj) - } - } - - val numOutputRows = longMetric("numOutputRows") - unsafeRow.map { r => - numOutputRows += 1 - r - } - } - - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - - s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + - s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - rdd :: Nil - } - - override protected def doProduce(ctx: CodegenContext): String = { - val numOutputRows = metricTerm(ctx, "numOutputRows") - // PhysicalRDD always just has one input - val input = ctx.freshName("input") - ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - val exprRows = output.zipWithIndex.map{ case (a, i) => - new BoundReference(i, a.dataType, a.nullable) - } - val row = ctx.freshName("row") - ctx.INPUT_ROW = row - ctx.currentVars = null - val columnsRowInput = exprRows.map(_.genCode(ctx)) - val inputRow = if (outputUnsafeRows) row else null - s""" - |while ($input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput, inputRow).trim} - | if (shouldStop()) return; - |} - """.stripMargin - } - - // Ignore rdd when checking results - override def sameResult(plan: SparkPlan): Boolean = plan match { - case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata - case _ => false - } -} - -/** - * Physical plan node for scanning data from HadoopFsRelations. - * - * @param relation The file-based relation to scan. - * @param output Output attributes of the scan. - * @param outputSchema Output schema of the scan. - * @param partitionFilters Predicates to use for partition pruning. - * @param dataFilters Data source filters to use for filtering data within partitions. - * @param metastoreTableIdentifier - */ -private[sql] case class FileSourceScanExec( - @transient relation: HadoopFsRelation, - output: Seq[Attribute], - outputSchema: StructType, - partitionFilters: Seq[Expression], - dataFilters: Seq[Filter], - override val metastoreTableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec { - - val supportsBatch = relation.fileFormat.supportBatch( - relation.sparkSession, StructType.fromAttributes(output)) - - val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) { - SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled - } else { - false - } - - override val outputPartitioning: Partitioning = { - val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { - relation.bucketSpec - } else { - None - } - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.flatMap { n => - output.find(_.name == n) - } - if (bucketColumns.size == spec.bucketColumnNames.size) { - HashPartitioning(bucketColumns, numBuckets) - } else { - UnknownPartitioning(0) - } - }.getOrElse { - UnknownPartitioning(0) - } - } - - // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( - "Format" -> relation.fileFormat.toString, - "ReadSchema" -> outputSchema.catalogString, - "Batched" -> supportsBatch.toString, - "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), - DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"), - DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", ")) - - private lazy val inputRDD: RDD[InternalRow] = { - val selectedPartitions = relation.location.listFiles(partitionFilters) - - val readFile: (PartitionedFile) => Iterator[InternalRow] = - relation.fileFormat.buildReaderWithPartitionValues( - sparkSession = relation.sparkSession, - dataSchema = relation.dataSchema, - partitionSchema = relation.partitionSchema, - requiredSchema = outputSchema, - filters = dataFilters, - options = relation.options, - hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - - relation.bucketSpec match { - case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => - createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) - case _ => - createNonBucketedReadRDD(readFile, selectedPartitions, relation) - } - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - inputRDD :: Nil - } - - private[sql] override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) - - protected override def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - // in the case of fallback, this batched scan should never fail because of: - // 1) only primitive types are supported - // 2) the number of columns should be smaller than spark.sql.codegen.maxFields - WholeStageCodegenExec(this).execute() - } else { - val unsafeRows = { - val scan = inputRDD - if (needsUnsafeRowConversion) { - scan.mapPartitionsInternal { iter => - val proj = UnsafeProjection.create(schema) - iter.map(proj) - } - } else { - scan - } - } - val numOutputRows = longMetric("numOutputRows") - unsafeRows.map { r => - numOutputRows += 1 - r - } - } - } - - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") - s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" - } - - override protected def doProduce(ctx: CodegenContext): String = { - if (supportsBatch) { - return doProduceVectorized(ctx) - } - val numOutputRows = metricTerm(ctx, "numOutputRows") - // PhysicalRDD always just has one input - val input = ctx.freshName("input") - ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - val exprRows = output.zipWithIndex.map{ case (a, i) => - new BoundReference(i, a.dataType, a.nullable) - } - val row = ctx.freshName("row") - ctx.INPUT_ROW = row - ctx.currentVars = null - val columnsRowInput = exprRows.map(_.genCode(ctx)) - val inputRow = if (needsUnsafeRowConversion) null else row - s""" - |while ($input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput, inputRow).trim} - | if (shouldStop()) return; - |} - """.stripMargin - } - - // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen - // never requires UnsafeRow as input. - private def doProduceVectorized(ctx: CodegenContext): String = { - val input = ctx.freshName("input") - // PhysicalRDD always just has one input - ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - - // metrics - val numOutputRows = metricTerm(ctx, "numOutputRows") - val scanTimeMetric = metricTerm(ctx, "scanTime") - val scanTimeTotalNs = ctx.freshName("scanTime") - ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val batch = ctx.freshName("batch") - ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" - val idx = ctx.freshName("batchIdx") - ctx.addMutableState("int", idx, s"$idx = 0;") - val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) - val columnAssigns = colVars.zipWithIndex.map { case (name, i) => - ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" - } - - val nextBatch = ctx.freshName("nextBatch") - ctx.addNewFunction(nextBatch, - s""" - |private void $nextBatch() throws java.io.IOException { - | long getBatchStart = System.nanoTime(); - | if ($input.hasNext()) { - | $batch = ($columnarBatchClz)$input.next(); - | $numOutputRows.add($batch.numRows()); - | $idx = 0; - | ${columnAssigns.mkString("", "\n", "\n")} - | } - | $scanTimeTotalNs += System.nanoTime() - getBatchStart; - |}""".stripMargin) - - ctx.currentVars = null - val rowidx = ctx.freshName("rowIdx") - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) - } - s""" - |if ($batch == null) { - | $nextBatch(); - |} - |while ($batch != null) { - | int numRows = $batch.numRows(); - | while ($idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columnsBatchInput).trim} - | if (shouldStop()) return; - | } - | $batch = null; - | $nextBatch(); - |} - |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - |$scanTimeTotalNs = 0; - """.stripMargin - } - - private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, - dataType: DataType, nullable: Boolean): ExprCode = { - val javaType = ctx.javaType(dataType) - val value = ctx.getValue(columnVar, dataType, ordinal) - val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" } - val valueVar = ctx.freshName("value") - val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" - val code = s"${ctx.registerComment(str)}\n" + (if (nullable) { - s""" - boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal); - $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value); - """ - } else { - s"$javaType ${valueVar} = $value;" - }).trim - ExprCode(code, isNullVar, valueVar) - } - - /** - * Create an RDD for bucketed reads. - * The non-bucketed variant of this function is [[createNonBucketedReadRDD]]. - * - * The algorithm is pretty simple: each RDD partition being returned should include all the files - * with the same bucket id from all the given Hive partitions. - * - * @param bucketSpec the bucketing spec. - * @param readFile a function to read each (part of a) file. - * @param selectedPartitions Hive-style partition that are part of the read. - * @param fsRelation [[HadoopFsRelation]] associated with the read. - */ - private def createBucketedReadRDD( - bucketSpec: BucketSpec, - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[Partition], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") - val bucketed = - selectedPartitions.flatMap { p => - p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) - PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) - } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) - } - - val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil)) - } - - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) - } - - /** - * Create an RDD for non-bucketed reads. - * The bucketed variant of this function is [[createBucketedReadRDD]]. - * - * @param readFile a function to read each (part of a) file. - * @param selectedPartitions Hive-style partition that are part of the read. - * @param fsRelation [[HadoopFsRelation]] associated with the read. - */ - private def createNonBucketedReadRDD( - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[Partition], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val defaultMaxSplitBytes = - fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism - - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => - val blockLocations = getBlockLocations(file) - if (fsRelation.fileFormat.isSplitable( - fsRelation.sparkSession, fsRelation.options, file.getPath)) { - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset - val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile( - partition.values, file.getPath.toUri.toString, offset, size, hosts) - } - } else { - val hosts = getBlockHosts(blockLocations, 0, file.getLen) - Seq(PartitionedFile( - partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) - } - } - }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - - val partitions = new ArrayBuffer[FilePartition] - val currentFiles = new ArrayBuffer[PartitionedFile] - var currentSize = 0L - - /** Close the current partition and move to the next. */ - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - val newPartition = - FilePartition( - partitions.size, - currentFiles.toArray.toSeq) // Copy to a new Array. - partitions.append(newPartition) - } - currentFiles.clear() - currentSize = 0 - } - - // Assign files to partitions using "First Fit Decreasing" (FFD) - // TODO: consider adding a slop factor here? - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { - closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes - currentFiles.append(file) - } - closePartition() - - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) - } - - private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { - case f: LocatedFileStatus => f.getBlockLocations - case f => Array.empty[BlockLocation] - } - - // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)` - // pair that represents a segment of the same file, find out the block that contains the largest - // fraction the segment, and returns location hosts of that block. If no such block can be found, - // returns an empty array. - private def getBlockHosts( - blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = { - val candidates = blockLocations.map { - // The fragment starts from a position within this block - case b if b.getOffset <= offset && offset < b.getOffset + b.getLength => - b.getHosts -> (b.getOffset + b.getLength - offset).min(length) - - // The fragment ends at a position within this block - case b if offset <= b.getOffset && offset + length < b.getLength => - b.getHosts -> (offset + length - b.getOffset).min(length) - - // The fragment fully contains this block - case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length => - b.getHosts -> b.getLength - - // The fragment doesn't intersect with this block - case b => - b.getHosts -> 0L - }.filter { case (hosts, size) => - size > 0L - } - - if (candidates.isEmpty) { - Array.empty[String] - } else { - val (hosts, _) = candidates.maxBy { case (_, size) => size } - hosts - } - } - - override def sameResult(plan: SparkPlan): Boolean = plan match { - case other: FileSourceScanExec => - val thisPredicates = partitionFilters.map(cleanExpression) - val otherPredicates = other.partitionFilters.map(cleanExpression) - val result = relation == other.relation && metadata == other.metadata && - thisPredicates.length == otherPredicates.length && - thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2)) - result - case _ => false - } -} - -private[sql] object DataSourceScanExec { - // Metadata keys - val INPUT_PATHS = "InputPaths" - val PUSHED_FILTERS = "PushedFilters" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 52b1677d7c..ed8ccca6de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -361,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val markedFilters = for (filter <- pushedFilters) yield { if (handledFilters.contains(filter)) s"*$filter" else s"$filter" } - pairs += (PUSHED_FILTERS -> markedFilters.mkString("[", ", ", "]")) + pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]")) } pairs.toMap } -- cgit v1.2.3