diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 258 |
1 files changed, 148 insertions, 110 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 815ff01c4c..392c48fb7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => Parq import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{AtomicType, DataType} object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,24 +123,30 @@ private[sql] case class PhysicalRDD( } } -/** Physical plan node for scanning data from a relation. */ -private[sql] case class DataSourceScan( - output: Seq[Attribute], - rdd: RDD[InternalRow], - @transient relation: BaseRelation, - override val metadata: Map[String, String] = Map.empty) - extends LeafNode with CodegenSupport { +private[sql] trait DataSourceScan extends LeafNode { + val rdd: RDD[InternalRow] + val relation: BaseRelation override val nodeName: String = relation.toString // Ignore rdd when checking results - override def sameResult(plan: SparkPlan ): Boolean = plan match { + override def sameResult(plan: SparkPlan): Boolean = plan match { case other: DataSourceScan => relation == other.relation && metadata == other.metadata case _ => false } +} - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) +/** Physical plan node for scanning data from a relation. */ +private[sql] case class RowDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) val outputUnsafeRows = relation match { case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => @@ -149,27 +155,6 @@ private[sql] case class DataSourceScan( case _ => false } - override val outputPartitioning = { - val bucketSpec = relation match { - // TODO: this should be closer to bucket planning. - case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec - case _ => None - } - - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - HashPartitioning(bucketColumns, numBuckets) - }.getOrElse { - UnknownPartitioning(0) - } - } - protected override def doExecute(): RDD[InternalRow] = { val unsafeRow = if (outputUnsafeRows) { rdd @@ -196,6 +181,57 @@ private[sql] case class DataSourceScan( rdd :: Nil } + override protected def doProduce(ctx: CodegenContext): String = { + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + new BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.gen(ctx)) + val inputRow = if (outputUnsafeRows) row else null + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } +} + +/** Physical plan node for scanning data from a batched relation. */ +private[sql] case class BatchedDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException + } + + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataStr = metadataEntries.mkString(" ", ", ", "") + s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + } + + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, dataType: DataType, nullable: Boolean): ExprCode = { val javaType = ctx.javaType(dataType) @@ -217,96 +253,64 @@ private[sql] case class DataSourceScan( // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val input = ctx.freshName("input") - val idx = ctx.freshName("batchIdx") - val rowidx = ctx.freshName("rowIdx") - val batch = ctx.freshName("batch") // PhysicalRDD always just has one input ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnAssigns = colVars.zipWithIndex.map { case (name, i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = ${batch}.column($i);" } - - val row = ctx.freshName("row") - val numOutputRows = metricTerm(ctx, "numOutputRows") - - // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this - // by looking at the first value of the RDD and then calling the function which will process - // the remaining. It is faster to return batches. - // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know - // here which path to use. Fix this. + s"$name = $batch.column($i);" + } - ctx.currentVars = null - val columns1 = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } - val scanBatches = ctx.freshName("processBatches") - ctx.addNewFunction(scanBatches, + val nextBatch = ctx.freshName("nextBatch") + ctx.addNewFunction(nextBatch, s""" - | private void $scanBatches() throws java.io.IOException { - | while (true) { - | int numRows = $batch.numRows(); - | if ($idx == 0) { - | ${columnAssigns.mkString("", "\n", "\n")} - | $numOutputRows.add(numRows); - | } - | - | // this loop is very perf sensitive and changes to it should be measured carefully - | while ($idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columns1).trim} - | if (shouldStop()) return; - | } - | - | if (!$input.hasNext()) { - | $batch = null; - | break; - | } - | $batch = ($columnarBatchClz)$input.next(); - | $idx = 0; - | } - | }""".stripMargin) - - val exprRows = - output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) - ctx.INPUT_ROW = row + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) + ctx.currentVars = null - val columns2 = exprRows.map(_.gen(ctx)) - val inputRow = if (outputUnsafeRows) row else null - val scanRows = ctx.freshName("processRows") - ctx.addNewFunction(scanRows, - s""" - | private void $scanRows(InternalRow $row) throws java.io.IOException { - | boolean firstRow = true; - | while (firstRow || $input.hasNext()) { - | if (firstRow) { - | firstRow = false; - | } else { - | $row = (InternalRow) $input.next(); - | } - | $numOutputRows.add(1); - | ${consume(ctx, columns2, inputRow).trim} - | if (shouldStop()) return; - | } - | }""".stripMargin) - - val value = ctx.freshName("value") + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + } s""" - | if ($batch != null) { - | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; - | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); - | } - | } + |if ($batch == null) { + | $nextBatch(); + |} + |while ($batch != null) { + | int numRows = $batch.numRows(); + | while ($idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | if (shouldStop()) return; + | } + | $batch = null; + | $nextBatch(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; """.stripMargin } } @@ -315,4 +319,38 @@ private[sql] object DataSourceScan { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" + + def create( + output: Seq[Attribute], + rdd: RDD[InternalRow], + relation: BaseRelation, + metadata: Map[String, String] = Map.empty): DataSourceScan = { + val outputPartitioning = { + val bucketSpec = relation match { + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec + case _ => None + } + + def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"bucket column $colName not found in existing columns " + + s"(${output.map(_.name).mkString(", ")})") + } + + bucketSpec.map { spec => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.map(toAttribute) + HashPartitioning(bucketColumns, numBuckets) + }.getOrElse { + UnknownPartitioning(0) + } + } + + relation match { + case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) => + BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + case _ => + RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + } + } } |