aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-10 23:23:01 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-10 23:23:01 -0800
commit8f744fe3d931c2380613b8e5bafa1bb1fd292839 (patch)
tree9f2b217223475d6cf242146b628720cfe51f0ef9 /sql
parentb5761d150b66ee0ae5f1be897d9d7a1abb039884 (diff)
downloadspark-8f744fe3d931c2380613b8e5bafa1bb1fd292839.tar.gz
spark-8f744fe3d931c2380613b8e5bafa1bb1fd292839.tar.bz2
spark-8f744fe3d931c2380613b8e5bafa1bb1fd292839.zip
[SPARK-13234] [SQL] remove duplicated SQL metrics
For lots of SQL operators, we have metrics for both of input and output, the number of input rows should be exactly the number of output rows of child, we could only have metrics for output rows. After we improved the performance using whole stage codegen, the overhead of SQL metrics are not trivial anymore, we should avoid that if it's not necessary. This PR remove all the SQL metrics for number of input rows, add SQL metric of number of output rows for all LeafNode. All remove the SQL metrics from those operators that have the same number of rows from input and output (for example, Projection, we may don't need that). The new SQL UI will looks like: ![metrics](https://cloud.githubusercontent.com/assets/40902/12965227/63614e5e-d009-11e5-88b3-84fea04f9c20.png) Author: Davies Liu <davies@databricks.com> Closes #11163 from davies/remove_metrics.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala10
24 files changed, 80 insertions, 208 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 92cfd5f841..cad7e25a32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -103,8 +104,11 @@ private[sql] case class PhysicalRDD(
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- if (isUnsafeRow) {
+ val unsafeRow = if (isUnsafeRow) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
@@ -112,6 +116,12 @@ private[sql] case class PhysicalRDD(
iter.map(proj)
}
}
+
+ val numOutputRows = longMetric("numOutputRows")
+ unsafeRow.map { r =>
+ numOutputRows += 1
+ r
+ }
}
override def simpleString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 59057bf966..f8aec9e7a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -29,6 +30,9 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
private val unsafeRows: Array[InternalRow] = {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
@@ -36,7 +40,13 @@ private[sql] case class LocalTableScan(
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
- protected override def doExecute(): RDD[InternalRow] = rdd
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ rdd.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
override def executeCollect(): Array[InternalRow] = {
unsafeRows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index 06a3991459..9fcfea8381 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -46,7 +46,6 @@ case class SortBasedAggregate(
AttributeSet(aggregateBufferAttributes)
override private[sql] lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -68,7 +67,6 @@ case class SortBasedAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsInternal { iter =>
// Because the constructor of an aggregation iterator will read at least the first row,
@@ -89,7 +87,6 @@ case class SortBasedAggregate(
resultExpressions,
(expressions, inputSchema) =>
newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),
- numInputRows,
numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 6501634ff9..8f974980bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -35,7 +35,6 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
- numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric)
extends AggregationIterator(
groupingExpressions,
@@ -97,7 +96,6 @@ class SortBasedAggregationIterator(
val inputRow = inputIterator.next()
nextGroupingKey = groupingProjection(inputRow).copy()
firstRowInNextGroup = inputRow.copy()
- numInputRows += 1
sortedInputHasNewGroup = true
} else {
// This inputIter is empty.
@@ -122,7 +120,6 @@ class SortBasedAggregationIterator(
// Get the grouping key.
val currentRow = inputIterator.next()
val groupingKey = groupingProjection(currentRow)
- numInputRows += 1
// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 340b8f78e5..a6950f805a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -47,7 +47,6 @@ case class TungstenAggregate(
require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes))
override private[sql] lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
@@ -77,7 +76,6 @@ case class TungstenAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
val dataSize = longMetric("dataSize")
val spillSize = longMetric("spillSize")
@@ -102,7 +100,6 @@ case class TungstenAggregate(
child.output,
iter,
testFallbackStartsAt,
- numInputRows,
numOutputRows,
dataSize,
spillSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 001e9c306a..c4f6594835 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -85,7 +85,6 @@ class TungstenAggregationIterator(
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[Int],
- numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric,
dataSize: LongSQLMetric,
spillSize: LongSQLMetric)
@@ -179,14 +178,12 @@ class TungstenAggregationIterator(
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
while (inputIter.hasNext) {
val newInput = inputIter.next()
- numInputRows += 1
processRow(buffer, newInput)
}
} else {
var i = 0
while (inputIter.hasNext) {
val newInput = inputIter.next()
- numInputRows += 1
val groupingKey = groupingProjection.apply(newInput)
var buffer: UnsafeRow = null
if (i < fallbackStartsAt) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f63e8a9b6d..949acb9aca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -29,9 +29,6 @@ import org.apache.spark.util.random.PoissonSampler
case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryNode with CodegenSupport {
- override private[sql] lazy val metrics = Map(
- "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
-
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def upstream(): RDD[InternalRow] = {
@@ -55,14 +52,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
}
protected override def doExecute(): RDD[InternalRow] = {
- val numRows = longMetric("numRows")
child.execute().mapPartitionsInternal { iter =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
- iter.map { row =>
- numRows += 1
- project(row)
- }
+ iter.map(project)
}
}
@@ -74,7 +67,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
override def output: Seq[Attribute] = child.output
private[sql] override lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def upstream(): RDD[InternalRow] = {
@@ -104,12 +96,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
}
protected override def doExecute(): RDD[InternalRow] = {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsInternal { iter =>
val predicate = newPredicate(condition, child.output)
iter.filter { row =>
- numInputRows += 1
val r = predicate(row)
if (r) numOutputRows += 1
r
@@ -135,9 +125,7 @@ case class Sample(
upperBound: Double,
withReplacement: Boolean,
seed: Long,
- child: SparkPlan)
- extends UnaryNode
-{
+ child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
protected override def doExecute(): RDD[InternalRow] = {
@@ -163,6 +151,9 @@ case class Range(
output: Seq[Attribute])
extends LeafNode with CodegenSupport {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def upstream(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
}
@@ -241,6 +232,7 @@ case class Range(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
@@ -283,6 +275,7 @@ case class Range(
overflow = true
}
+ numOutputRows += 1
unsafeRow.setLong(0, ret)
unsafeRow
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index 9084b74d1a..4858140229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
@@ -216,6 +217,9 @@ private[sql] case class InMemoryColumnarTableScan(
@transient relation: InMemoryRelation)
extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = attributes
// The cached version does not change the outputPartitioning of the original SparkPlan.
@@ -286,6 +290,8 @@ private[sql] case class InMemoryColumnarTableScan(
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
@@ -332,12 +338,18 @@ private[sql] case class InMemoryColumnarTableScan(
cachedBatchIterator
}
+ // update SQL metrics
+ val withMetrics = cachedBatchesToScan.map { batch =>
+ numOutputRows += batch.numRows
+ batch
+ }
+
val columnTypes = requestedColumnDataTypes.map {
case udt: UserDefinedType[_] => udt.sqlType
case other => other
}.toArray
val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
- columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray)
+ columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
if (enableAccumulators && columnarIterator.hasNext) {
readPartitions += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index cbd549763a..35c7963b48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -48,8 +48,6 @@ case class BroadcastHashJoin(
extends BinaryNode with HashJoin with CodegenSupport {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout: Duration = {
@@ -70,11 +68,6 @@ case class BroadcastHashJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
- val numBuildRows = buildSide match {
- case BuildLeft => longMetric("numLeftRows")
- case BuildRight => longMetric("numRightRows")
- }
-
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
@@ -84,7 +77,6 @@ case class BroadcastHashJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
val input: Array[InternalRow] = buildPlan.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect()
// The following line doesn't run in a job so we cannot track the metric value. However, we
@@ -93,10 +85,10 @@ case class BroadcastHashJoin(
// TODO: move this check into HashedRelation
val hashed = if (canJoinKeyFitWithinLong) {
LongHashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+ input.iterator, buildSideKeyGenerator, input.size)
} else {
HashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+ input.iterator, buildSideKeyGenerator, input.size)
}
sparkContext.broadcast(hashed)
}
@@ -108,10 +100,6 @@ case class BroadcastHashJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
- val numStreamedRows = buildSide match {
- case BuildLeft => longMetric("numRightRows")
- case BuildRight => longMetric("numLeftRows")
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -119,7 +107,7 @@ case class BroadcastHashJoin(
streamedPlan.execute().mapPartitions { streamedIter =>
val hashedRelation = broadcastRelation.value
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
- hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
+ hashJoin(streamedIter, hashedRelation, numOutputRows)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index ad3275696e..5e8c8ca043 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -44,8 +44,6 @@ case class BroadcastHashOuterJoin(
right: SparkPlan) extends BinaryNode with HashOuterJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout = {
@@ -66,14 +64,6 @@ case class BroadcastHashOuterJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
- val numBuildRows = joinType match {
- case RightOuter => longMetric("numLeftRows")
- case LeftOuter => longMetric("numRightRows")
- case x =>
- throw new IllegalArgumentException(
- s"HashOuterJoin should not take $x as the JoinType")
- }
-
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
@@ -83,14 +73,9 @@ case class BroadcastHashOuterJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
val input: Array[InternalRow] = buildPlan.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect()
- // The following line doesn't run in a job so we cannot track the metric value. However, we
- // have already tracked it in the above lines. So here we can use
- // `SQLMetrics.nullLongMetric` to ignore it.
- val hashed = HashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size)
+ val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -101,13 +86,6 @@ case class BroadcastHashOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
- val numStreamedRows = joinType match {
- case RightOuter => longMetric("numRightRows")
- case LeftOuter => longMetric("numLeftRows")
- case x =>
- throw new IllegalArgumentException(
- s"HashOuterJoin should not take $x as the JoinType")
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -122,7 +100,6 @@ case class BroadcastHashOuterJoin(
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
- numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
@@ -130,7 +107,6 @@ case class BroadcastHashOuterJoin(
case RightOuter =>
streamedIter.flatMap(currentRow => {
- numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index d0e18dfcf3..4f1cfd2e81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -36,36 +36,31 @@ case class BroadcastLeftSemiJoinHash(
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
val input = right.execute().map { row =>
- numRightRows += 1
row.copy()
}.collect()
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
+ val hashSet = buildKeyHashSet(input.toIterator)
val broadcastedRelation = sparkContext.broadcast(hashSet)
left.execute().mapPartitionsInternal { streamIter =>
- hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
+ hashSemiJoin(streamIter, broadcastedRelation.value, numOutputRows)
}
} else {
val hashRelation =
- HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
+ HashedRelation(input.toIterator, rightKeyGenerator, input.size)
val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitionsInternal { streamIter =>
val hashedRelation = broadcastedRelation.value
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
- hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
+ hashSemiJoin(streamIter, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index e55f869478..4585cbda92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -36,8 +36,6 @@ case class BroadcastNestedLoopJoin(
// TODO: Override requiredChildDistribution.
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
/** BuildRight means the right relation <=> the broadcast relation. */
@@ -73,15 +71,10 @@ case class BroadcastNestedLoopJoin(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
- val (numStreamedRows, numBuildRows) = buildSide match {
- case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows"))
- case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect().toIndexedSeq)
@@ -98,7 +91,6 @@ case class BroadcastNestedLoopJoin(
streamedIter.foreach { streamedRow =>
var i = 0
var streamRowMatched = false
- numStreamedRows += 1
while (i < broadcastedRelation.value.size) {
val broadcastedRow = broadcastedRelation.value(i)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 93d32e1fb9..e417079b61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -82,23 +82,13 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
override def output: Seq[Attribute] = left.output ++ right.output
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
- val leftResults = left.execute().map { row =>
- numLeftRows += 1
- row.asInstanceOf[UnsafeRow]
- }
- val rightResults = right.execute().map { row =>
- numRightRows += 1
- row.asInstanceOf[UnsafeRow]
- }
+ val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
+ val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
pair.mapPartitionsInternal { iter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ecbb1ac64b..332a748d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -99,7 +99,6 @@ trait HashJoin {
protected def hashJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashedRelation: HashedRelation,
numOutputRows: LongSQLMetric): Iterator[InternalRow] =
{
@@ -126,7 +125,6 @@ trait HashJoin {
// find the next match
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
- numStreamRows += 1
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashedRelation.get(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 3e0f74cd98..0220e0b8a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -43,14 +43,13 @@ trait HashSemiJoin {
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected def buildKeyHashSet(
- buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = {
+ buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
val hashSet = new java.util.HashSet[InternalRow]()
// Create a Hash set of buildKeys
val rightKey = rightKeyGenerator
while (buildIter.hasNext) {
val currentRow = buildIter.next()
- numBuildRows += 1
val rowKey = rightKey(currentRow)
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
@@ -65,12 +64,10 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashSet: java.util.Set[InternalRow],
numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
streamIter.filter(current => {
- numStreamRows += 1
val key = joinKeys(current)
val r = !key.anyNull && hashSet.contains(key)
if (r) numOutputRows += 1
@@ -80,13 +77,11 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashedRelation: HashedRelation,
numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
val joinedRow = new JoinedRow
streamIter.filter { current =>
- numStreamRows += 1
val key = joinKeys(current)
lazy val rowBuffer = hashedRelation.get(key)
val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index eb6930a14f..0978570d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -159,18 +159,17 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
- apply(localNode.asIterator, SQLMetrics.nullLongMetric, keyGenerator)
+ apply(localNode.asIterator, keyGenerator)
}
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int = 64): HashedRelation = {
if (keyGenerator.isInstanceOf[UnsafeProjection]) {
return UnsafeHashedRelation(
- input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+ input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
}
// TODO: Use Spark's HashMap implementation.
@@ -184,7 +183,6 @@ private[execution] object HashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
currentRow = input.next()
- numInputRows += 1
val rowKey = keyGenerator(currentRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -427,7 +425,6 @@ private[joins] object UnsafeHashedRelation {
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: UnsafeProjection,
sizeEstimate: Int): HashedRelation = {
@@ -437,7 +434,6 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
- numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -604,7 +600,6 @@ private[joins] object LongHashedRelation {
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int): HashedRelation = {
@@ -619,7 +614,6 @@ private[joins] object LongHashedRelation {
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
numFields = unsafeRow.numFields()
- numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val key = rowKey.getLong(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 82498ee395..ce758d63b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -34,8 +34,6 @@ case class LeftSemiJoinBNL(
// TODO: Override requiredChildDistribution.
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning = streamed.outputPartitioning
@@ -52,13 +50,10 @@ case class LeftSemiJoinBNL(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map { row =>
- numRightRows += 1
row.copy()
}.collect().toIndexedSeq)
@@ -66,7 +61,6 @@ case class LeftSemiJoinBNL(
val joinedRow = new JoinedRow
streamedIter.filter(streamedRow => {
- numLeftRows += 1
var i = 0
var matched = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 25b3b5ca23..d8d3045ccf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -36,8 +36,6 @@ case class LeftSemiJoinHash(
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning = left.outputPartitioning
@@ -46,17 +44,15 @@ case class LeftSemiJoinHash(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(buildIter, numRightRows)
- hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
+ val hashSet = buildKeyHashSet(buildIter)
+ hashSemiJoin(streamIter, hashSet, numOutputRows)
} else {
- val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator)
- hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
+ val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+ hashSemiJoin(streamIter, hashRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 322a954b4f..cd8a5670e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -37,8 +37,6 @@ case class SortMergeJoin(
right: SparkPlan) extends BinaryNode {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = left.output ++ right.output
@@ -60,8 +58,6 @@ case class SortMergeJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -89,9 +85,7 @@ case class SortMergeJoin(
rightKeyGenerator,
keyOrdering,
RowIterator.fromScala(leftIter),
- numLeftRows,
- RowIterator.fromScala(rightIter),
- numRightRows
+ RowIterator.fromScala(rightIter)
)
private[this] val joinRow = new JoinedRow
private[this] val resultProjection: (InternalRow) => InternalRow =
@@ -157,9 +151,7 @@ private[joins] class SortMergeJoinScanner(
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
- numStreamedRows: LongSQLMetric,
- bufferedIter: RowIterator,
- numBufferedRows: LongSQLMetric) {
+ bufferedIter: RowIterator) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@@ -284,7 +276,6 @@ private[joins] class SortMergeJoinScanner(
if (streamedIter.advanceNext()) {
streamedRow = streamedIter.getRow
streamedRowKey = streamedKeyGenerator(streamedRow)
- numStreamedRows += 1
true
} else {
streamedRow = null
@@ -302,7 +293,6 @@ private[joins] class SortMergeJoinScanner(
while (!foundRow && bufferedIter.advanceNext()) {
bufferedRow = bufferedIter.getRow
bufferedRowKey = bufferedKeyGenerator(bufferedRow)
- numBufferedRows += 1
foundRow = !bufferedRowKey.anyNull
}
if (!foundRow) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index ed41ad2a00..40a6c93b59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -40,8 +40,6 @@ case class SortMergeOuterJoin(
right: SparkPlan) extends BinaryNode {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = {
@@ -96,8 +94,6 @@ case class SortMergeOuterJoin(
UnsafeProjection.create(rightKeys, right.output)
override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -119,9 +115,7 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createRightKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
- numLeftRows,
- bufferedIter = RowIterator.fromScala(rightIter),
- numRightRows
+ bufferedIter = RowIterator.fromScala(rightIter)
)
val rightNullRow = new GenericInternalRow(right.output.length)
new LeftOuterIterator(
@@ -133,9 +127,7 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createLeftKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
- numRightRows,
- bufferedIter = RowIterator.fromScala(leftIter),
- numLeftRows
+ bufferedIter = RowIterator.fromScala(leftIter)
)
val leftNullRow = new GenericInternalRow(left.output.length)
new RightOuterIterator(
@@ -149,9 +141,7 @@ case class SortMergeOuterJoin(
rightKeyGenerator = createRightKeyGenerator(),
keyOrdering,
leftIter = RowIterator.fromScala(leftIter),
- numLeftRows,
rightIter = RowIterator.fromScala(rightIter),
- numRightRows,
boundCondition,
leftNullRow,
rightNullRow)
@@ -289,9 +279,7 @@ private class SortMergeFullOuterJoinScanner(
rightKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
leftIter: RowIterator,
- numLeftRows: LongSQLMetric,
rightIter: RowIterator,
- numRightRows: LongSQLMetric,
boundCondition: InternalRow => Boolean,
leftNullRow: InternalRow,
rightNullRow: InternalRow) {
@@ -321,7 +309,6 @@ private class SortMergeFullOuterJoinScanner(
if (leftIter.advanceNext()) {
leftRow = leftIter.getRow
leftRowKey = leftKeyGenerator(leftRow)
- numLeftRows += 1
true
} else {
leftRow = null
@@ -338,7 +325,6 @@ private class SortMergeFullOuterJoinScanner(
if (rightIter.advanceNext()) {
rightRow = rightIter.getRow
rightRowKey = rightKeyGenerator(rightRow)
- numRightRows += 1
true
} else {
rightRow = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index f985dfbd8a..04dd809df1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -36,8 +36,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("GeneralHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
- val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+ val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[GeneralHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -47,13 +46,11 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
val data2 = CompactBuffer[InternalRow](data(2))
data2 += data(2)
assert(hashed.get(data(2)) === data2)
- assert(numDataRows.value.value === data.length)
}
test("UniqueKeyHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
- val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+ val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -66,19 +63,17 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
assert(uniqHashed.getValue(data(1)) === data(1))
assert(uniqHashed.getValue(data(2)) === data(2))
assert(uniqHashed.getValue(InternalRow(10)) === null)
- assert(numDataRows.value.value === data.length)
}
test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
val toUnsafe = UnsafeProjection.create(schema)
val unsafeData = data.map(toUnsafe(_).copy()).toArray
val buildKey = Seq(BoundReference(0, IntegerType, false))
val keyGenerator = UnsafeProjection.create(buildKey)
- val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1)
+ val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
assert(hashed.isInstanceOf[UnsafeHashedRelation])
assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0)))
@@ -100,7 +95,6 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1)))
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)) === data2)
- assert(numDataRows.value.value === data.length)
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
@@ -139,7 +133,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
Seq(BoundReference(0, IntegerType, false), BoundReference(1, IntegerType, true)))
val rows = (0 until 100).map(i => unsafeProj(InternalRow(i, i + 1)).copy())
val keyProj = UnsafeProjection.create(Seq(BoundReference(0, IntegerType, false)))
- val longRelation = LongHashedRelation(rows.iterator, SQLMetrics.nullLongMetric, keyProj, 100)
+ val longRelation = LongHashedRelation(rows.iterator, keyProj, 100)
assert(longRelation.isInstanceOf[LongArrayRelation])
val longArrayRelation = longRelation.asInstanceOf[LongArrayRelation]
(0 until 100).foreach { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2260e48702..d24625a535 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -113,23 +113,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("Project metrics") {
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
- val df = person.select('name)
- testSparkPlanMetrics(df, 1, Map(
- 0L -> ("Project", Map(
- "number of rows" -> 2L)))
- )
- }
-
test("Filter metrics") {
// Assume the execution plan is
// PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
val df = person.filter('age < 25)
testSparkPlanMetrics(df, 1, Map(
0L -> ("Filter", Map(
- "number of input rows" -> 2L,
"number of output rows" -> 1L)))
)
}
@@ -149,10 +138,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = testData2.groupBy().count() // 2 partitions
testSparkPlanMetrics(df, 1, Map(
2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
"number of output rows" -> 2L)),
0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 2L,
"number of output rows" -> 1L)))
)
@@ -160,10 +147,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df2 = testData2.groupBy('a).count()
testSparkPlanMetrics(df2, 1, Map(
2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
"number of output rows" -> 4L)),
0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 4L,
"number of output rows" -> 3L)))
)
}
@@ -181,8 +166,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 4L,
- "number of right rows" -> 2L,
"number of output rows" -> 4L)))
)
}
@@ -201,8 +184,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 6L,
- "number of right rows" -> 2L,
"number of output rows" -> 8L)))
)
@@ -211,8 +192,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df2, 1, Map(
1L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 2L,
- "number of right rows" -> 6L,
"number of output rows" -> 8L)))
)
}
@@ -226,8 +205,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), "key")
testSparkPlanMetrics(df, 2, Map(
1L -> ("BroadcastHashJoin", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -240,16 +217,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
testSparkPlanMetrics(df, 2, Map(
0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
"number of output rows" -> 5L)))
)
val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
testSparkPlanMetrics(df3, 2, Map(
0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
"number of output rows" -> 6L)))
)
}
@@ -265,8 +238,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
testSparkPlanMetrics(df, 3, Map(
1L -> ("BroadcastNestedLoopJoin", Map(
- "number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 2L,
"number of output rows" -> 12L)))
)
}
@@ -280,8 +251,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
0L -> ("BroadcastLeftSemiJoinHash", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -295,8 +264,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(df2, $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 1, Map(
0L -> ("LeftSemiJoinHash", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -310,8 +277,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(df2, $"key" < $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
0L -> ("LeftSemiJoinBNL", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -326,8 +291,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
"SELECT * FROM testData2 JOIN testDataForJoin")
testSparkPlanMetrics(df, 1, Map(
1L -> ("CartesianProduct", Map(
- "number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 4L, // right is read twice
"number of output rows" -> 12L)))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index a3e5243b68..d3191d3aea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
- metrics += qe.executedPlan.longMetric("numInputRows").value.value
+ metrics += qe.executedPlan.longMetric("numOutputRows").value.value
}
}
sqlContext.listenerManager.register(listener)
@@ -105,9 +105,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
}
assert(metrics.length == 3)
- assert(metrics(0) == 1)
- assert(metrics(1) == 1)
- assert(metrics(2) == 2)
+ assert(metrics(0) === 1)
+ assert(metrics(1) === 1)
+ assert(metrics(2) === 2)
sqlContext.listenerManager.unregister(listener)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index eff8833e92..235b80b7c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.util.Utils
@@ -52,6 +53,9 @@ case class HiveTableScan(
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def producedAttributes: AttributeSet = outputSet ++
AttributeSet(partitionPruningPred.flatMap(_.references))
@@ -146,9 +150,13 @@ case class HiveTableScan(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
}
}
+ val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
- iter.map(proj)
+ iter.map { r =>
+ numOutputRows += 1
+ proj(r)
+ }
}
}