aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala157
1 files changed, 94 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 815ff01c4c..ab575e90c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
@@ -139,8 +139,12 @@ private[sql] case class DataSourceScan(
case _ => false
}
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ private[sql] override lazy val metrics = if (canProcessBatches()) {
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+ } else {
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ }
val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -170,6 +174,17 @@ private[sql] case class DataSourceScan(
}
}
+ private def canProcessBatches(): Boolean = {
+ relation match {
+ case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] &&
+ SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) &&
+ SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) =>
+ true
+ case _ =>
+ false
+ }
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
@@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
// TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
// here which path to use. Fix this.
- ctx.currentVars = null
- val columns1 = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
- val scanBatches = ctx.freshName("processBatches")
- ctx.addNewFunction(scanBatches,
- s"""
- | private void $scanBatches() throws java.io.IOException {
- | while (true) {
- | int numRows = $batch.numRows();
- | if ($idx == 0) {
- | ${columnAssigns.mkString("", "\n", "\n")}
- | $numOutputRows.add(numRows);
- | }
- |
- | // this loop is very perf sensitive and changes to it should be measured carefully
- | while ($idx < numRows) {
- | int $rowidx = $idx++;
- | ${consume(ctx, columns1).trim}
- | if (shouldStop()) return;
- | }
- |
- | if (!$input.hasNext()) {
- | $batch = null;
- | break;
- | }
- | $batch = ($columnarBatchClz)$input.next();
- | $idx = 0;
- | }
- | }""".stripMargin)
-
val exprRows =
- output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
+ output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
ctx.INPUT_ROW = row
ctx.currentVars = null
- val columns2 = exprRows.map(_.gen(ctx))
+ val columnsRowInput = exprRows.map(_.gen(ctx))
val inputRow = if (outputUnsafeRows) row else null
val scanRows = ctx.freshName("processRows")
ctx.addNewFunction(scanRows,
s"""
- | private void $scanRows(InternalRow $row) throws java.io.IOException {
- | boolean firstRow = true;
- | while (firstRow || $input.hasNext()) {
- | if (firstRow) {
- | firstRow = false;
- | } else {
- | $row = (InternalRow) $input.next();
- | }
- | $numOutputRows.add(1);
- | ${consume(ctx, columns2, inputRow).trim}
- | if (shouldStop()) return;
- | }
- | }""".stripMargin)
-
- val value = ctx.freshName("value")
- s"""
- | if ($batch != null) {
- | $scanBatches();
- | } else if ($input.hasNext()) {
- | Object $value = $input.next();
- | if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
- | $scanBatches();
- | } else {
- | $scanRows((InternalRow) $value);
- | }
- | }
- """.stripMargin
+ | private void $scanRows(InternalRow $row) throws java.io.IOException {
+ | boolean firstRow = true;
+ | while (!shouldStop() && (firstRow || $input.hasNext())) {
+ | if (firstRow) {
+ | firstRow = false;
+ | } else {
+ | $row = (InternalRow) $input.next();
+ | }
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ | }
+ | }""".stripMargin)
+
+ // Timers for how long we spent inside the scan. We can only maintain this when using batches,
+ // otherwise the overhead is too high.
+ if (canProcessBatches()) {
+ val scanTimeMetric = metricTerm(ctx, "scanTime")
+ val getBatchStart = ctx.freshName("scanStart")
+ val scanTimeTotalNs = ctx.freshName("scanTime")
+ ctx.currentVars = null
+ val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
+ val scanBatches = ctx.freshName("processBatches")
+ ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+ ctx.addNewFunction(scanBatches,
+ s"""
+ | private void $scanBatches() throws java.io.IOException {
+ | while (true) {
+ | int numRows = $batch.numRows();
+ | if ($idx == 0) {
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | $numOutputRows.add(numRows);
+ | }
+ |
+ | while (!shouldStop() && $idx < numRows) {
+ | int $rowidx = $idx++;
+ | ${consume(ctx, columnsBatchInput).trim}
+ | }
+ | if (shouldStop()) return;
+ |
+ | long $getBatchStart = System.nanoTime();
+ | if (!$input.hasNext()) {
+ | $batch = null;
+ | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+ | break;
+ | }
+ | $batch = ($columnarBatchClz)$input.next();
+ | $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
+ | $idx = 0;
+ | }
+ | }""".stripMargin)
+
+ val value = ctx.freshName("value")
+ s"""
+ | if ($batch != null) {
+ | $scanBatches();
+ | } else if ($input.hasNext()) {
+ | Object $value = $input.next();
+ | if ($value instanceof $columnarBatchClz) {
+ | $batch = ($columnarBatchClz)$value;
+ | $scanBatches();
+ | } else {
+ | $scanRows((InternalRow) $value);
+ | }
+ | }
+ """.stripMargin
+ } else {
+ s"""
+ |if ($input.hasNext()) {
+ | $scanRows((InternalRow) $input.next());
+ |}
+ """.stripMargin
+ }
}
}