aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2016-03-28 21:37:46 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-28 21:37:46 -0700
commita180286b7994f9f9a56b84903cc9ee6057ba6624 (patch)
treeefd885a9422d46edf9e62b77bc00730b05f06c41 /sql/core
parent4a55c336397d3f138c6f5735675ec7cb272827f5 (diff)
downloadspark-a180286b7994f9f9a56b84903cc9ee6057ba6624.tar.gz
spark-a180286b7994f9f9a56b84903cc9ee6057ba6624.tar.bz2
spark-a180286b7994f9f9a56b84903cc9ee6057ba6624.zip
[SPARK-14210] [SQL] Add a metric for time spent in scans.
## What changes were proposed in this pull request? This adds a metric to parquet scans that measures the time in just the scan phase. This is only possible when the scan returns ColumnarBatches, otherwise the overhead is too high. This combined with the pipeline metric lets us easily see what percent of the time was in the scan. Author: Nong Li <nong@databricks.com> Closes #12007 from nongli/spark-14210.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala157
1 files changed, 94 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 815ff01c4c..ab575e90c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
@@ -139,8 +139,12 @@ private[sql] case class DataSourceScan(
case _ => false
}
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ private[sql] override lazy val metrics = if (canProcessBatches()) {
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+ } else {
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ }
val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -170,6 +174,17 @@ private[sql] case class DataSourceScan(
}
}
+ private def canProcessBatches(): Boolean = {
+ relation match {
+ case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] &&
+ SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) &&
+ SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) =>
+ true
+ case _ =>
+ false
+ }
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
@@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
// TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
// here which path to use. Fix this.
- ctx.currentVars = null
- val columns1 = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
- val scanBatches = ctx.freshName("processBatches")
- ctx.addNewFunction(scanBatches,
- s"""
- | private void $scanBatches() throws java.io.IOException {
- | while (true) {
- | int numRows = $batch.numRows();
- | if ($idx == 0) {
- | ${columnAssigns.mkString("", "\n", "\n")}
- | $numOutputRows.add(numRows);
- | }
- |
- | // this loop is very perf sensitive and changes to it should be measured carefully
- | while ($idx < numRows) {
- | int $rowidx = $idx++;
- | ${consume(ctx, columns1).trim}
- | if (shouldStop()) return;
- | }
- |
- | if (!$input.hasNext()) {
- | $batch = null;
- | break;
- | }
- | $batch = ($columnarBatchClz)$input.next();
- | $idx = 0;
- | }
- | }""".stripMargin)
-
val exprRows =
- output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
+ output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
ctx.INPUT_ROW = row
ctx.currentVars = null
- val columns2 = exprRows.map(_.gen(ctx))
+ val columnsRowInput = exprRows.map(_.gen(ctx))
val inputRow = if (outputUnsafeRows) row else null
val scanRows = ctx.freshName("processRows")
ctx.addNewFunction(scanRows,
s"""
- | private void $scanRows(InternalRow $row) throws java.io.IOException {
- | boolean firstRow = true;
- | while (firstRow || $input.hasNext()) {
- | if (firstRow) {
- | firstRow = false;
- | } else {
- | $row = (InternalRow) $input.next();
- | }
- | $numOutputRows.add(1);
- | ${consume(ctx, columns2, inputRow).trim}
- | if (shouldStop()) return;
- | }
- | }""".stripMargin)
-
- val value = ctx.freshName("value")
- s"""
- | if ($batch != null) {
- | $scanBatches();
- | } else if ($input.hasNext()) {
- | Object $value = $input.next();
- | if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
- | $scanBatches();
- | } else {
- | $scanRows((InternalRow) $value);
- | }
- | }
- """.stripMargin
+ | private void $scanRows(InternalRow $row) throws java.io.IOException {
+ | boolean firstRow = true;
+ | while (!shouldStop() && (firstRow || $input.hasNext())) {
+ | if (firstRow) {
+ | firstRow = false;
+ | } else {
+ | $row = (InternalRow) $input.next();
+ | }
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ | }
+ | }""".stripMargin)
+
+ // Timers for how long we spent inside the scan. We can only maintain this when using batches,
+ // otherwise the overhead is too high.
+ if (canProcessBatches()) {
+ val scanTimeMetric = metricTerm(ctx, "scanTime")
+ val getBatchStart = ctx.freshName("scanStart")
+ val scanTimeTotalNs = ctx.freshName("scanTime")
+ ctx.currentVars = null
+ val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
+ val scanBatches = ctx.freshName("processBatches")
+ ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+ ctx.addNewFunction(scanBatches,
+ s"""
+ | private void $scanBatches() throws java.io.IOException {
+ | while (true) {
+ | int numRows = $batch.numRows();
+ | if ($idx == 0) {
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | $numOutputRows.add(numRows);
+ | }
+ |
+ | while (!shouldStop() && $idx < numRows) {
+ | int $rowidx = $idx++;
+ | ${consume(ctx, columnsBatchInput).trim}
+ | }
+ | if (shouldStop()) return;
+ |
+ | long $getBatchStart = System.nanoTime();
+ | if (!$input.hasNext()) {
+ | $batch = null;
+ | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+ | break;
+ | }
+ | $batch = ($columnarBatchClz)$input.next();
+ | $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
+ | $idx = 0;
+ | }
+ | }""".stripMargin)
+
+ val value = ctx.freshName("value")
+ s"""
+ | if ($batch != null) {
+ | $scanBatches();
+ | } else if ($input.hasNext()) {
+ | Object $value = $input.next();
+ | if ($value instanceof $columnarBatchClz) {
+ | $batch = ($columnarBatchClz)$value;
+ | $scanBatches();
+ | } else {
+ | $scanRows((InternalRow) $value);
+ | }
+ | }
+ """.stripMargin
+ } else {
+ s"""
+ |if ($input.hasNext()) {
+ | $scanRows((InternalRow) $input.next());
+ |}
+ """.stripMargin
+ }
}
}