aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala258
1 files changed, 148 insertions, 110 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 815ff01c4c..392c48fb7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => Parq
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{AtomicType, DataType}
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
@@ -123,24 +123,30 @@ private[sql] case class PhysicalRDD(
}
}
-/** Physical plan node for scanning data from a relation. */
-private[sql] case class DataSourceScan(
- output: Seq[Attribute],
- rdd: RDD[InternalRow],
- @transient relation: BaseRelation,
- override val metadata: Map[String, String] = Map.empty)
- extends LeafNode with CodegenSupport {
+private[sql] trait DataSourceScan extends LeafNode {
+ val rdd: RDD[InternalRow]
+ val relation: BaseRelation
override val nodeName: String = relation.toString
// Ignore rdd when checking results
- override def sameResult(plan: SparkPlan ): Boolean = plan match {
+ override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: DataSourceScan => relation == other.relation && metadata == other.metadata
case _ => false
}
+}
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class RowDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -149,27 +155,6 @@ private[sql] case class DataSourceScan(
case _ => false
}
- override val outputPartitioning = {
- val bucketSpec = relation match {
- // TODO: this should be closer to bucket planning.
- case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
- case _ => None
- }
-
- def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- HashPartitioning(bucketColumns, numBuckets)
- }.getOrElse {
- UnknownPartitioning(0)
- }
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
@@ -196,6 +181,57 @@ private[sql] case class DataSourceScan(
rdd :: Nil
}
+ override protected def doProduce(ctx: CodegenContext): String = {
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ // PhysicalRDD always just has one input
+ val input = ctx.freshName("input")
+ ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+ val exprRows = output.zipWithIndex.map{ case (a, i) =>
+ new BoundReference(i, a.dataType, a.nullable)
+ }
+ val row = ctx.freshName("row")
+ ctx.INPUT_ROW = row
+ ctx.currentVars = null
+ val columnsRowInput = exprRows.map(_.gen(ctx))
+ val inputRow = if (outputUnsafeRows) row else null
+ s"""
+ |while ($input.hasNext()) {
+ | InternalRow $row = (InternalRow) $input.next();
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ | if (shouldStop()) return;
+ |}
+ """.stripMargin
+ }
+}
+
+/** Physical plan node for scanning data from a batched relation. */
+private[sql] case class BatchedDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def simpleString: String = {
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ val metadataStr = metadataEntries.mkString(" ", ", ", "")
+ s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ }
+
+ override def upstreams(): Seq[RDD[InternalRow]] = {
+ rdd :: Nil
+ }
+
private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
dataType: DataType, nullable: Boolean): ExprCode = {
val javaType = ctx.javaType(dataType)
@@ -217,96 +253,64 @@ private[sql] case class DataSourceScan(
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
- val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
- val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val input = ctx.freshName("input")
- val idx = ctx.freshName("batchIdx")
- val rowidx = ctx.freshName("rowIdx")
- val batch = ctx.freshName("batch")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+ // metrics
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ val scanTimeMetric = metricTerm(ctx, "scanTime")
+ val scanTimeTotalNs = ctx.freshName("scanTime")
+ ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+ val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+ val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+ val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+ val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
- s"$name = ${batch}.column($i);" }
-
- val row = ctx.freshName("row")
- val numOutputRows = metricTerm(ctx, "numOutputRows")
-
- // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this
- // by looking at the first value of the RDD and then calling the function which will process
- // the remaining. It is faster to return batches.
- // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
- // here which path to use. Fix this.
+ s"$name = $batch.column($i);"
+ }
- ctx.currentVars = null
- val columns1 = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
- val scanBatches = ctx.freshName("processBatches")
- ctx.addNewFunction(scanBatches,
+ val nextBatch = ctx.freshName("nextBatch")
+ ctx.addNewFunction(nextBatch,
s"""
- | private void $scanBatches() throws java.io.IOException {
- | while (true) {
- | int numRows = $batch.numRows();
- | if ($idx == 0) {
- | ${columnAssigns.mkString("", "\n", "\n")}
- | $numOutputRows.add(numRows);
- | }
- |
- | // this loop is very perf sensitive and changes to it should be measured carefully
- | while ($idx < numRows) {
- | int $rowidx = $idx++;
- | ${consume(ctx, columns1).trim}
- | if (shouldStop()) return;
- | }
- |
- | if (!$input.hasNext()) {
- | $batch = null;
- | break;
- | }
- | $batch = ($columnarBatchClz)$input.next();
- | $idx = 0;
- | }
- | }""".stripMargin)
-
- val exprRows =
- output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
- ctx.INPUT_ROW = row
+ |private void $nextBatch() throws java.io.IOException {
+ | long getBatchStart = System.nanoTime();
+ | if ($input.hasNext()) {
+ | $batch = ($columnarBatchClz)$input.next();
+ | $numOutputRows.add($batch.numRows());
+ | $idx = 0;
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | }
+ | $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+ |}""".stripMargin)
+
ctx.currentVars = null
- val columns2 = exprRows.map(_.gen(ctx))
- val inputRow = if (outputUnsafeRows) row else null
- val scanRows = ctx.freshName("processRows")
- ctx.addNewFunction(scanRows,
- s"""
- | private void $scanRows(InternalRow $row) throws java.io.IOException {
- | boolean firstRow = true;
- | while (firstRow || $input.hasNext()) {
- | if (firstRow) {
- | firstRow = false;
- | } else {
- | $row = (InternalRow) $input.next();
- | }
- | $numOutputRows.add(1);
- | ${consume(ctx, columns2, inputRow).trim}
- | if (shouldStop()) return;
- | }
- | }""".stripMargin)
-
- val value = ctx.freshName("value")
+ val rowidx = ctx.freshName("rowIdx")
+ val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+ }
s"""
- | if ($batch != null) {
- | $scanBatches();
- | } else if ($input.hasNext()) {
- | Object $value = $input.next();
- | if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
- | $scanBatches();
- | } else {
- | $scanRows((InternalRow) $value);
- | }
- | }
+ |if ($batch == null) {
+ | $nextBatch();
+ |}
+ |while ($batch != null) {
+ | int numRows = $batch.numRows();
+ | while ($idx < numRows) {
+ | int $rowidx = $idx++;
+ | ${consume(ctx, columnsBatchInput).trim}
+ | if (shouldStop()) return;
+ | }
+ | $batch = null;
+ | $nextBatch();
+ |}
+ |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+ |$scanTimeTotalNs = 0;
""".stripMargin
}
}
@@ -315,4 +319,38 @@ private[sql] object DataSourceScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
+
+ def create(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ relation: BaseRelation,
+ metadata: Map[String, String] = Map.empty): DataSourceScan = {
+ val outputPartitioning = {
+ val bucketSpec = relation match {
+ // TODO: this should be closer to bucket planning.
+ case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
+ case _ => None
+ }
+
+ def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+ throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+ s"(${output.map(_.name).mkString(", ")})")
+ }
+
+ bucketSpec.map { spec =>
+ val numBuckets = spec.numBuckets
+ val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+ HashPartitioning(bucketColumns, numBuckets)
+ }.getOrElse {
+ UnknownPartitioning(0)
+ }
+ }
+
+ relation match {
+ case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) =>
+ BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ case _ =>
+ RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ }
+ }
}