aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorAla Luszczak <ala@databricks.com>2017-02-18 07:51:41 -0800
committerReynold Xin <rxin@databricks.com>2017-02-18 07:51:41 -0800
commitb486ffc86d8ad6c303321dcf8514afee723f61f8 (patch)
tree090b1eeb158c80cd51e6670d997351516bf22e15 /sql/core/src/main/scala/org
parent729ce3703257aa34c00c5c8253e6971faf6a0c8d (diff)
downloadspark-b486ffc86d8ad6c303321dcf8514afee723f61f8.tar.gz
spark-b486ffc86d8ad6c303321dcf8514afee723f61f8.tar.bz2
spark-b486ffc86d8ad6c303321dcf8514afee723f61f8.zip
[SPARK-19447] Make Range operator generate "recordsRead" metric
## What changes were proposed in this pull request? The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite. ## How was this patch tested? Unit tests. Author: Ala Luszczak <ala@databricks.com> Closes #16960 from ala/range-records-read.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index c01f9c5e3d..87e90ed685 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -365,6 +365,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val taskContext = ctx.freshName("taskContext")
ctx.addMutableState("TaskContext", taskContext, s"$taskContext = TaskContext.get();")
+ val inputMetrics = ctx.freshName("inputMetrics")
+ ctx.addMutableState("InputMetrics", inputMetrics,
+ s"$inputMetrics = $taskContext.taskMetrics().inputMetrics();")
// In order to periodically update the metrics without inflicting performance penalty, this
// operator produces elements in batches. After a batch is complete, the metrics are updated
@@ -460,7 +463,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| if ($nextBatchTodo == 0) break;
| }
| $numOutput.add($nextBatchTodo);
- | $numGenerated.add($nextBatchTodo);
+ | $inputMetrics.incRecordsRead($nextBatchTodo);
|
| $batchEnd += $nextBatchTodo * ${step}L;
| }
@@ -469,7 +472,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
- val numGeneratedRows = longMetric("numGeneratedRows")
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
@@ -488,10 +490,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val safePartitionEnd = getSafeMargin(partitionEnd)
val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize
val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1)
+ val taskContext = TaskContext.get()
val iter = new Iterator[InternalRow] {
private[this] var number: Long = safePartitionStart
private[this] var overflow: Boolean = false
+ private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics
override def hasNext =
if (!overflow) {
@@ -513,12 +517,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
}
numOutputRows += 1
- numGeneratedRows += 1
+ inputMetrics.incRecordsRead(1)
unsafeRow.setLong(0, ret)
unsafeRow
}
}
- new InterruptibleIterator(TaskContext.get(), iter)
+ new InterruptibleIterator(taskContext, iter)
}
}