diff options
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 157 |
1 files changed, 94 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 815ff01c4c..ab575e90c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.toCommentSafeString import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics @@ -139,8 +139,12 @@ private[sql] case class DataSourceScan( case _ => false } - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + private[sql] override lazy val metrics = if (canProcessBatches()) { + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + } else { + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + } val outputUnsafeRows = relation match { case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => @@ -170,6 +174,17 @@ private[sql] case class DataSourceScan( } } + private def canProcessBatches(): Boolean = { + relation match { + case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] && + SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) && + SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) => + true + case _ => + false + } + } + protected override def doExecute(): RDD[InternalRow] = { val unsafeRow = if (outputUnsafeRows) { rdd @@ -241,73 +256,89 @@ private[sql] case class DataSourceScan( // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know // here which path to use. Fix this. - ctx.currentVars = null - val columns1 = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } - val scanBatches = ctx.freshName("processBatches") - ctx.addNewFunction(scanBatches, - s""" - | private void $scanBatches() throws java.io.IOException { - | while (true) { - | int numRows = $batch.numRows(); - | if ($idx == 0) { - | ${columnAssigns.mkString("", "\n", "\n")} - | $numOutputRows.add(numRows); - | } - | - | // this loop is very perf sensitive and changes to it should be measured carefully - | while ($idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columns1).trim} - | if (shouldStop()) return; - | } - | - | if (!$input.hasNext()) { - | $batch = null; - | break; - | } - | $batch = ($columnarBatchClz)$input.next(); - | $idx = 0; - | } - | }""".stripMargin) - val exprRows = - output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) + output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) ctx.INPUT_ROW = row ctx.currentVars = null - val columns2 = exprRows.map(_.gen(ctx)) + val columnsRowInput = exprRows.map(_.gen(ctx)) val inputRow = if (outputUnsafeRows) row else null val scanRows = ctx.freshName("processRows") ctx.addNewFunction(scanRows, s""" - | private void $scanRows(InternalRow $row) throws java.io.IOException { - | boolean firstRow = true; - | while (firstRow || $input.hasNext()) { - | if (firstRow) { - | firstRow = false; - | } else { - | $row = (InternalRow) $input.next(); - | } - | $numOutputRows.add(1); - | ${consume(ctx, columns2, inputRow).trim} - | if (shouldStop()) return; - | } - | }""".stripMargin) - - val value = ctx.freshName("value") - s""" - | if ($batch != null) { - | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; - | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); - | } - | } - """.stripMargin + | private void $scanRows(InternalRow $row) throws java.io.IOException { + | boolean firstRow = true; + | while (!shouldStop() && (firstRow || $input.hasNext())) { + | if (firstRow) { + | firstRow = false; + | } else { + | $row = (InternalRow) $input.next(); + | } + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | } + | }""".stripMargin) + + // Timers for how long we spent inside the scan. We can only maintain this when using batches, + // otherwise the overhead is too high. + if (canProcessBatches()) { + val scanTimeMetric = metricTerm(ctx, "scanTime") + val getBatchStart = ctx.freshName("scanStart") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.currentVars = null + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } + val scanBatches = ctx.freshName("processBatches") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + ctx.addNewFunction(scanBatches, + s""" + | private void $scanBatches() throws java.io.IOException { + | while (true) { + | int numRows = $batch.numRows(); + | if ($idx == 0) { + | ${columnAssigns.mkString("", "\n", "\n")} + | $numOutputRows.add(numRows); + | } + | + | while (!shouldStop() && $idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | } + | if (shouldStop()) return; + | + | long $getBatchStart = System.nanoTime(); + | if (!$input.hasNext()) { + | $batch = null; + | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + | break; + | } + | $batch = ($columnarBatchClz)$input.next(); + | $scanTimeTotalNs += System.nanoTime() - $getBatchStart; + | $idx = 0; + | } + | }""".stripMargin) + + val value = ctx.freshName("value") + s""" + | if ($batch != null) { + | $scanBatches(); + | } else if ($input.hasNext()) { + | Object $value = $input.next(); + | if ($value instanceof $columnarBatchClz) { + | $batch = ($columnarBatchClz)$value; + | $scanBatches(); + | } else { + | $scanRows((InternalRow) $value); + | } + | } + """.stripMargin + } else { + s""" + |if ($input.hasNext()) { + | $scanRows((InternalRow) $input.next()); + |} + """.stripMargin + } } } |