From 8f744fe3d931c2380613b8e5bafa1bb1fd292839 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 10 Feb 2016 23:23:01 -0800 Subject: [SPARK-13234] [SQL] remove duplicated SQL metrics For lots of SQL operators, we have metrics for both of input and output, the number of input rows should be exactly the number of output rows of child, we could only have metrics for output rows. After we improved the performance using whole stage codegen, the overhead of SQL metrics are not trivial anymore, we should avoid that if it's not necessary. This PR remove all the SQL metrics for number of input rows, add SQL metric of number of output rows for all LeafNode. All remove the SQL metrics from those operators that have the same number of rows from input and output (for example, Projection, we may don't need that). The new SQL UI will looks like: ![metrics](https://cloud.githubusercontent.com/assets/40902/12965227/63614e5e-d009-11e5-88b3-84fea04f9c20.png) Author: Davies Liu Closes #11163 from davies/remove_metrics. --- .../apache/spark/sql/execution/ExistingRDD.scala | 12 ++++++- .../spark/sql/execution/LocalTableScan.scala | 12 ++++++- .../execution/aggregate/SortBasedAggregate.scala | 3 -- .../aggregate/SortBasedAggregationIterator.scala | 3 -- .../execution/aggregate/TungstenAggregate.scala | 3 -- .../aggregate/TungstenAggregationIterator.scala | 3 -- .../spark/sql/execution/basicOperators.scala | 21 ++++-------- .../columnar/InMemoryColumnarTableScan.scala | 14 +++++++- .../sql/execution/joins/BroadcastHashJoin.scala | 18 ++--------- .../execution/joins/BroadcastHashOuterJoin.scala | 26 +-------------- .../joins/BroadcastLeftSemiJoinHash.scala | 13 +++----- .../execution/joins/BroadcastNestedLoopJoin.scala | 8 ----- .../sql/execution/joins/CartesianProduct.scala | 14 ++------ .../spark/sql/execution/joins/HashJoin.scala | 2 -- .../spark/sql/execution/joins/HashSemiJoin.scala | 7 +--- .../spark/sql/execution/joins/HashedRelation.scala | 10 ++---- .../sql/execution/joins/LeftSemiJoinBNL.scala | 6 ---- .../sql/execution/joins/LeftSemiJoinHash.scala | 12 +++---- .../spark/sql/execution/joins/SortMergeJoin.scala | 14 ++------ .../sql/execution/joins/SortMergeOuterJoin.scala | 18 ++--------- .../sql/execution/joins/HashedRelationSuite.scala | 14 +++----- .../sql/execution/metric/SQLMetricsSuite.scala | 37 ---------------------- .../spark/sql/util/DataFrameCallbackSuite.scala | 8 ++--- .../spark/sql/hive/execution/HiveTableScan.scala | 10 +++++- 24 files changed, 80 insertions(+), 208 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 92cfd5f841..cad7e25a32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType @@ -103,8 +104,11 @@ private[sql] case class PhysicalRDD( override val outputPartitioning: Partitioning = UnknownPartitioning(0)) extends LeafNode { + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + protected override def doExecute(): RDD[InternalRow] = { - if (isUnsafeRow) { + val unsafeRow = if (isUnsafeRow) { rdd } else { rdd.mapPartitionsInternal { iter => @@ -112,6 +116,12 @@ private[sql] case class PhysicalRDD( iter.map(proj) } } + + val numOutputRows = longMetric("numOutputRows") + unsafeRow.map { r => + numOutputRows += 1 + r + } } override def simpleString: String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 59057bf966..f8aec9e7a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -29,6 +30,9 @@ private[sql] case class LocalTableScan( output: Seq[Attribute], rows: Seq[InternalRow]) extends LeafNode { + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + private val unsafeRows: Array[InternalRow] = { val proj = UnsafeProjection.create(output, output) rows.map(r => proj(r).copy()).toArray @@ -36,7 +40,13 @@ private[sql] case class LocalTableScan( private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) - protected override def doExecute(): RDD[InternalRow] = rdd + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + rdd.map { r => + numOutputRows += 1 + r + } + } override def executeCollect(): Array[InternalRow] = { unsafeRows diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala index 06a3991459..9fcfea8381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala @@ -46,7 +46,6 @@ case class SortBasedAggregate( AttributeSet(aggregateBufferAttributes) override private[sql] lazy val metrics = Map( - "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -68,7 +67,6 @@ case class SortBasedAggregate( } protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsInternal { iter => // Because the constructor of an aggregation iterator will read at least the first row, @@ -89,7 +87,6 @@ case class SortBasedAggregate( resultExpressions, (expressions, inputSchema) => newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled), - numInputRows, numOutputRows) if (!hasInput && groupingExpressions.isEmpty) { // There is no input and there is no grouping expressions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 6501634ff9..8f974980bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -35,7 +35,6 @@ class SortBasedAggregationIterator( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection), - numInputRows: LongSQLMetric, numOutputRows: LongSQLMetric) extends AggregationIterator( groupingExpressions, @@ -97,7 +96,6 @@ class SortBasedAggregationIterator( val inputRow = inputIterator.next() nextGroupingKey = groupingProjection(inputRow).copy() firstRowInNextGroup = inputRow.copy() - numInputRows += 1 sortedInputHasNewGroup = true } else { // This inputIter is empty. @@ -122,7 +120,6 @@ class SortBasedAggregationIterator( // Get the grouping key. val currentRow = inputIterator.next() val groupingKey = groupingProjection(currentRow) - numInputRows += 1 // Check if the current row belongs the current input row. if (currentGroupingKey == groupingKey) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 340b8f78e5..a6950f805a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -47,7 +47,6 @@ case class TungstenAggregate( require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes)) override private[sql] lazy val metrics = Map( - "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) @@ -77,7 +76,6 @@ case class TungstenAggregate( } protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") val dataSize = longMetric("dataSize") val spillSize = longMetric("spillSize") @@ -102,7 +100,6 @@ case class TungstenAggregate( child.output, iter, testFallbackStartsAt, - numInputRows, numOutputRows, dataSize, spillSize) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 001e9c306a..c4f6594835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -85,7 +85,6 @@ class TungstenAggregationIterator( originalInputAttributes: Seq[Attribute], inputIter: Iterator[InternalRow], testFallbackStartsAt: Option[Int], - numInputRows: LongSQLMetric, numOutputRows: LongSQLMetric, dataSize: LongSQLMetric, spillSize: LongSQLMetric) @@ -179,14 +178,12 @@ class TungstenAggregationIterator( val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) while (inputIter.hasNext) { val newInput = inputIter.next() - numInputRows += 1 processRow(buffer, newInput) } } else { var i = 0 while (inputIter.hasNext) { val newInput = inputIter.next() - numInputRows += 1 val groupingKey = groupingProjection.apply(newInput) var buffer: UnsafeRow = null if (i < fallbackStartsAt) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f63e8a9b6d..949acb9aca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -29,9 +29,6 @@ import org.apache.spark.util.random.PoissonSampler case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode with CodegenSupport { - override private[sql] lazy val metrics = Map( - "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows")) - override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def upstream(): RDD[InternalRow] = { @@ -55,14 +52,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) } protected override def doExecute(): RDD[InternalRow] = { - val numRows = longMetric("numRows") child.execute().mapPartitionsInternal { iter => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) - iter.map { row => - numRows += 1 - project(row) - } + iter.map(project) } } @@ -74,7 +67,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit override def output: Seq[Attribute] = child.output private[sql] override lazy val metrics = Map( - "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def upstream(): RDD[InternalRow] = { @@ -104,12 +96,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit } protected override def doExecute(): RDD[InternalRow] = { - val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsInternal { iter => val predicate = newPredicate(condition, child.output) iter.filter { row => - numInputRows += 1 val r = predicate(row) if (r) numOutputRows += 1 r @@ -135,9 +125,7 @@ case class Sample( upperBound: Double, withReplacement: Boolean, seed: Long, - child: SparkPlan) - extends UnaryNode -{ + child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output protected override def doExecute(): RDD[InternalRow] = { @@ -163,6 +151,9 @@ case class Range( output: Seq[Attribute]) extends LeafNode with CodegenSupport { + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + override def upstream(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i)) } @@ -241,6 +232,7 @@ case class Range( } protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") sqlContext .sparkContext .parallelize(0 until numSlices, numSlices) @@ -283,6 +275,7 @@ case class Range( overflow = true } + numOutputRows += 1 unsafeRow.setLong(0, ret) unsafeRow } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala index 9084b74d1a..4858140229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{LeafNode, SparkPlan} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.storage.StorageLevel @@ -216,6 +217,9 @@ private[sql] case class InMemoryColumnarTableScan( @transient relation: InMemoryRelation) extends LeafNode { + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + override def output: Seq[Attribute] = attributes // The cached version does not change the outputPartitioning of the original SparkPlan. @@ -286,6 +290,8 @@ private[sql] case class InMemoryColumnarTableScan( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + if (enableAccumulators) { readPartitions.setValue(0) readBatches.setValue(0) @@ -332,12 +338,18 @@ private[sql] case class InMemoryColumnarTableScan( cachedBatchIterator } + // update SQL metrics + val withMetrics = cachedBatchesToScan.map { batch => + numOutputRows += batch.numRows + batch + } + val columnTypes = requestedColumnDataTypes.map { case udt: UserDefinedType[_] => udt.sqlType case other => other }.toArray val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray) + columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) if (enableAccumulators && columnarIterator.hasNext) { readPartitions += 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index cbd549763a..35c7963b48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -48,8 +48,6 @@ case class BroadcastHashJoin( extends BinaryNode with HashJoin with CodegenSupport { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) val timeout: Duration = { @@ -70,11 +68,6 @@ case class BroadcastHashJoin( // for the same query. @transient private lazy val broadcastFuture = { - val numBuildRows = buildSide match { - case BuildLeft => longMetric("numLeftRows") - case BuildRight => longMetric("numRightRows") - } - // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) Future { @@ -84,7 +77,6 @@ case class BroadcastHashJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala // types val input: Array[InternalRow] = buildPlan.execute().map { row => - numBuildRows += 1 row.copy() }.collect() // The following line doesn't run in a job so we cannot track the metric value. However, we @@ -93,10 +85,10 @@ case class BroadcastHashJoin( // TODO: move this check into HashedRelation val hashed = if (canJoinKeyFitWithinLong) { LongHashedRelation( - input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size) + input.iterator, buildSideKeyGenerator, input.size) } else { HashedRelation( - input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size) + input.iterator, buildSideKeyGenerator, input.size) } sparkContext.broadcast(hashed) } @@ -108,10 +100,6 @@ case class BroadcastHashJoin( } protected override def doExecute(): RDD[InternalRow] = { - val numStreamedRows = buildSide match { - case BuildLeft => longMetric("numRightRows") - case BuildRight => longMetric("numLeftRows") - } val numOutputRows = longMetric("numOutputRows") val broadcastRelation = Await.result(broadcastFuture, timeout) @@ -119,7 +107,7 @@ case class BroadcastHashJoin( streamedPlan.execute().mapPartitions { streamedIter => val hashedRelation = broadcastRelation.value TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize) - hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows) + hashJoin(streamedIter, hashedRelation, numOutputRows) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index ad3275696e..5e8c8ca043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -44,8 +44,6 @@ case class BroadcastHashOuterJoin( right: SparkPlan) extends BinaryNode with HashOuterJoin { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) val timeout = { @@ -66,14 +64,6 @@ case class BroadcastHashOuterJoin( // for the same query. @transient private lazy val broadcastFuture = { - val numBuildRows = joinType match { - case RightOuter => longMetric("numLeftRows") - case LeftOuter => longMetric("numRightRows") - case x => - throw new IllegalArgumentException( - s"HashOuterJoin should not take $x as the JoinType") - } - // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) Future { @@ -83,14 +73,9 @@ case class BroadcastHashOuterJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala // types val input: Array[InternalRow] = buildPlan.execute().map { row => - numBuildRows += 1 row.copy() }.collect() - // The following line doesn't run in a job so we cannot track the metric value. However, we - // have already tracked it in the above lines. So here we can use - // `SQLMetrics.nullLongMetric` to ignore it. - val hashed = HashedRelation( - input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size) + val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size) sparkContext.broadcast(hashed) } }(BroadcastHashJoin.broadcastHashJoinExecutionContext) @@ -101,13 +86,6 @@ case class BroadcastHashOuterJoin( } override def doExecute(): RDD[InternalRow] = { - val numStreamedRows = joinType match { - case RightOuter => longMetric("numRightRows") - case LeftOuter => longMetric("numLeftRows") - case x => - throw new IllegalArgumentException( - s"HashOuterJoin should not take $x as the JoinType") - } val numOutputRows = longMetric("numOutputRows") val broadcastRelation = Await.result(broadcastFuture, timeout) @@ -122,7 +100,6 @@ case class BroadcastHashOuterJoin( joinType match { case LeftOuter => streamedIter.flatMap(currentRow => { - numStreamedRows += 1 val rowKey = keyGenerator(currentRow) joinedRow.withLeft(currentRow) leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows) @@ -130,7 +107,6 @@ case class BroadcastHashOuterJoin( case RightOuter => streamedIter.flatMap(currentRow => { - numStreamedRows += 1 val rowKey = keyGenerator(currentRow) joinedRow.withRight(currentRow) rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index d0e18dfcf3..4f1cfd2e81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -36,36 +36,31 @@ case class BroadcastLeftSemiJoinHash( condition: Option[Expression]) extends BinaryNode with HashSemiJoin { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") val input = right.execute().map { row => - numRightRows += 1 row.copy() }.collect() if (condition.isEmpty) { - val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric) + val hashSet = buildKeyHashSet(input.toIterator) val broadcastedRelation = sparkContext.broadcast(hashSet) left.execute().mapPartitionsInternal { streamIter => - hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows) + hashSemiJoin(streamIter, broadcastedRelation.value, numOutputRows) } } else { val hashRelation = - HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size) + HashedRelation(input.toIterator, rightKeyGenerator, input.size) val broadcastedRelation = sparkContext.broadcast(hashRelation) left.execute().mapPartitionsInternal { streamIter => val hashedRelation = broadcastedRelation.value TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize) - hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows) + hashSemiJoin(streamIter, hashedRelation, numOutputRows) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index e55f869478..4585cbda92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -36,8 +36,6 @@ case class BroadcastNestedLoopJoin( // TODO: Override requiredChildDistribution. override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) /** BuildRight means the right relation <=> the broadcast relation. */ @@ -73,15 +71,10 @@ case class BroadcastNestedLoopJoin( newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) protected override def doExecute(): RDD[InternalRow] = { - val (numStreamedRows, numBuildRows) = buildSide match { - case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows")) - case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows")) - } val numOutputRows = longMetric("numOutputRows") val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map { row => - numBuildRows += 1 row.copy() }.collect().toIndexedSeq) @@ -98,7 +91,6 @@ case class BroadcastNestedLoopJoin( streamedIter.foreach { streamedRow => var i = 0 var streamRowMatched = false - numStreamedRows += 1 while (i < broadcastedRelation.value.size) { val broadcastedRow = broadcastedRelation.value(i) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 93d32e1fb9..e417079b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -82,23 +82,13 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod override def output: Seq[Attribute] = left.output ++ right.output override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") - val leftResults = left.execute().map { row => - numLeftRows += 1 - row.asInstanceOf[UnsafeRow] - } - val rightResults = right.execute().map { row => - numRightRows += 1 - row.asInstanceOf[UnsafeRow] - } + val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]] + val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]] val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size) pair.mapPartitionsInternal { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index ecbb1ac64b..332a748d3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -99,7 +99,6 @@ trait HashJoin { protected def hashJoin( streamIter: Iterator[InternalRow], - numStreamRows: LongSQLMetric, hashedRelation: HashedRelation, numOutputRows: LongSQLMetric): Iterator[InternalRow] = { @@ -126,7 +125,6 @@ trait HashJoin { // find the next match while (currentHashMatches == null && streamIter.hasNext) { currentStreamedRow = streamIter.next() - numStreamRows += 1 val key = joinKeys(currentStreamedRow) if (!key.anyNull) { currentHashMatches = hashedRelation.get(key) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala index 3e0f74cd98..0220e0b8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala @@ -43,14 +43,13 @@ trait HashSemiJoin { newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) protected def buildKeyHashSet( - buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = { + buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = { val hashSet = new java.util.HashSet[InternalRow]() // Create a Hash set of buildKeys val rightKey = rightKeyGenerator while (buildIter.hasNext) { val currentRow = buildIter.next() - numBuildRows += 1 val rowKey = rightKey(currentRow) if (!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) @@ -65,12 +64,10 @@ trait HashSemiJoin { protected def hashSemiJoin( streamIter: Iterator[InternalRow], - numStreamRows: LongSQLMetric, hashSet: java.util.Set[InternalRow], numOutputRows: LongSQLMetric): Iterator[InternalRow] = { val joinKeys = leftKeyGenerator streamIter.filter(current => { - numStreamRows += 1 val key = joinKeys(current) val r = !key.anyNull && hashSet.contains(key) if (r) numOutputRows += 1 @@ -80,13 +77,11 @@ trait HashSemiJoin { protected def hashSemiJoin( streamIter: Iterator[InternalRow], - numStreamRows: LongSQLMetric, hashedRelation: HashedRelation, numOutputRows: LongSQLMetric): Iterator[InternalRow] = { val joinKeys = leftKeyGenerator val joinedRow = new JoinedRow streamIter.filter { current => - numStreamRows += 1 val key = joinKeys(current) lazy val rowBuffer = hashedRelation.get(key) val r = !key.anyNull && rowBuffer != null && rowBuffer.exists { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index eb6930a14f..0978570d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -159,18 +159,17 @@ private[joins] class UniqueKeyHashedRelation( private[execution] object HashedRelation { def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = { - apply(localNode.asIterator, SQLMetrics.nullLongMetric, keyGenerator) + apply(localNode.asIterator, keyGenerator) } def apply( input: Iterator[InternalRow], - numInputRows: LongSQLMetric, keyGenerator: Projection, sizeEstimate: Int = 64): HashedRelation = { if (keyGenerator.isInstanceOf[UnsafeProjection]) { return UnsafeHashedRelation( - input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate) + input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate) } // TODO: Use Spark's HashMap implementation. @@ -184,7 +183,6 @@ private[execution] object HashedRelation { // Create a mapping of buildKeys -> rows while (input.hasNext) { currentRow = input.next() - numInputRows += 1 val rowKey = keyGenerator(currentRow) if (!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) @@ -427,7 +425,6 @@ private[joins] object UnsafeHashedRelation { def apply( input: Iterator[InternalRow], - numInputRows: LongSQLMetric, keyGenerator: UnsafeProjection, sizeEstimate: Int): HashedRelation = { @@ -437,7 +434,6 @@ private[joins] object UnsafeHashedRelation { // Create a mapping of buildKeys -> rows while (input.hasNext) { val unsafeRow = input.next().asInstanceOf[UnsafeRow] - numInputRows += 1 val rowKey = keyGenerator(unsafeRow) if (!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) @@ -604,7 +600,6 @@ private[joins] object LongHashedRelation { def apply( input: Iterator[InternalRow], - numInputRows: LongSQLMetric, keyGenerator: Projection, sizeEstimate: Int): HashedRelation = { @@ -619,7 +614,6 @@ private[joins] object LongHashedRelation { while (input.hasNext) { val unsafeRow = input.next().asInstanceOf[UnsafeRow] numFields = unsafeRow.numFields() - numInputRows += 1 val rowKey = keyGenerator(unsafeRow) if (!rowKey.anyNull) { val key = rowKey.getLong(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala index 82498ee395..ce758d63b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala @@ -34,8 +34,6 @@ case class LeftSemiJoinBNL( // TODO: Override requiredChildDistribution. override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def outputPartitioning: Partitioning = streamed.outputPartitioning @@ -52,13 +50,10 @@ case class LeftSemiJoinBNL( newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) protected override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map { row => - numRightRows += 1 row.copy() }.collect().toIndexedSeq) @@ -66,7 +61,6 @@ case class LeftSemiJoinBNL( val joinedRow = new JoinedRow streamedIter.filter(streamedRow => { - numLeftRows += 1 var i = 0 var matched = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 25b3b5ca23..d8d3045ccf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -36,8 +36,6 @@ case class LeftSemiJoinHash( condition: Option[Expression]) extends BinaryNode with HashSemiJoin { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def outputPartitioning: Partitioning = left.outputPartitioning @@ -46,17 +44,15 @@ case class LeftSemiJoinHash( ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil protected override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) => if (condition.isEmpty) { - val hashSet = buildKeyHashSet(buildIter, numRightRows) - hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows) + val hashSet = buildKeyHashSet(buildIter) + hashSemiJoin(streamIter, hashSet, numOutputRows) } else { - val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator) - hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows) + val hashRelation = HashedRelation(buildIter, rightKeyGenerator) + hashSemiJoin(streamIter, hashRelation, numOutputRows) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 322a954b4f..cd8a5670e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -37,8 +37,6 @@ case class SortMergeJoin( right: SparkPlan) extends BinaryNode { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = left.output ++ right.output @@ -60,8 +58,6 @@ case class SortMergeJoin( } protected override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => @@ -89,9 +85,7 @@ case class SortMergeJoin( rightKeyGenerator, keyOrdering, RowIterator.fromScala(leftIter), - numLeftRows, - RowIterator.fromScala(rightIter), - numRightRows + RowIterator.fromScala(rightIter) ) private[this] val joinRow = new JoinedRow private[this] val resultProjection: (InternalRow) => InternalRow = @@ -157,9 +151,7 @@ private[joins] class SortMergeJoinScanner( bufferedKeyGenerator: Projection, keyOrdering: Ordering[InternalRow], streamedIter: RowIterator, - numStreamedRows: LongSQLMetric, - bufferedIter: RowIterator, - numBufferedRows: LongSQLMetric) { + bufferedIter: RowIterator) { private[this] var streamedRow: InternalRow = _ private[this] var streamedRowKey: InternalRow = _ private[this] var bufferedRow: InternalRow = _ @@ -284,7 +276,6 @@ private[joins] class SortMergeJoinScanner( if (streamedIter.advanceNext()) { streamedRow = streamedIter.getRow streamedRowKey = streamedKeyGenerator(streamedRow) - numStreamedRows += 1 true } else { streamedRow = null @@ -302,7 +293,6 @@ private[joins] class SortMergeJoinScanner( while (!foundRow && bufferedIter.advanceNext()) { bufferedRow = bufferedIter.getRow bufferedRowKey = bufferedKeyGenerator(bufferedRow) - numBufferedRows += 1 foundRow = !bufferedRowKey.anyNull } if (!foundRow) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala index ed41ad2a00..40a6c93b59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala @@ -40,8 +40,6 @@ case class SortMergeOuterJoin( right: SparkPlan) extends BinaryNode { override private[sql] lazy val metrics = Map( - "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), - "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = { @@ -96,8 +94,6 @@ case class SortMergeOuterJoin( UnsafeProjection.create(rightKeys, right.output) override def doExecute(): RDD[InternalRow] = { - val numLeftRows = longMetric("numLeftRows") - val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => @@ -119,9 +115,7 @@ case class SortMergeOuterJoin( bufferedKeyGenerator = createRightKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(leftIter), - numLeftRows, - bufferedIter = RowIterator.fromScala(rightIter), - numRightRows + bufferedIter = RowIterator.fromScala(rightIter) ) val rightNullRow = new GenericInternalRow(right.output.length) new LeftOuterIterator( @@ -133,9 +127,7 @@ case class SortMergeOuterJoin( bufferedKeyGenerator = createLeftKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(rightIter), - numRightRows, - bufferedIter = RowIterator.fromScala(leftIter), - numLeftRows + bufferedIter = RowIterator.fromScala(leftIter) ) val leftNullRow = new GenericInternalRow(left.output.length) new RightOuterIterator( @@ -149,9 +141,7 @@ case class SortMergeOuterJoin( rightKeyGenerator = createRightKeyGenerator(), keyOrdering, leftIter = RowIterator.fromScala(leftIter), - numLeftRows, rightIter = RowIterator.fromScala(rightIter), - numRightRows, boundCondition, leftNullRow, rightNullRow) @@ -289,9 +279,7 @@ private class SortMergeFullOuterJoinScanner( rightKeyGenerator: Projection, keyOrdering: Ordering[InternalRow], leftIter: RowIterator, - numLeftRows: LongSQLMetric, rightIter: RowIterator, - numRightRows: LongSQLMetric, boundCondition: InternalRow => Boolean, leftNullRow: InternalRow, rightNullRow: InternalRow) { @@ -321,7 +309,6 @@ private class SortMergeFullOuterJoinScanner( if (leftIter.advanceNext()) { leftRow = leftIter.getRow leftRowKey = leftKeyGenerator(leftRow) - numLeftRows += 1 true } else { leftRow = null @@ -338,7 +325,6 @@ private class SortMergeFullOuterJoinScanner( if (rightIter.advanceNext()) { rightRow = rightIter.getRow rightRowKey = rightKeyGenerator(rightRow) - numRightRows += 1 true } else { rightRow = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index f985dfbd8a..04dd809df1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -36,8 +36,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("GeneralHashedRelation") { val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) - val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data") - val hashed = HashedRelation(data.iterator, numDataRows, keyProjection) + val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[GeneralHashedRelation]) assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0))) @@ -47,13 +46,11 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { val data2 = CompactBuffer[InternalRow](data(2)) data2 += data(2) assert(hashed.get(data(2)) === data2) - assert(numDataRows.value.value === data.length) } test("UniqueKeyHashedRelation") { val data = Array(InternalRow(0), InternalRow(1), InternalRow(2)) - val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data") - val hashed = HashedRelation(data.iterator, numDataRows, keyProjection) + val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[UniqueKeyHashedRelation]) assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0))) @@ -66,19 +63,17 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { assert(uniqHashed.getValue(data(1)) === data(1)) assert(uniqHashed.getValue(data(2)) === data(2)) assert(uniqHashed.getValue(InternalRow(10)) === null) - assert(numDataRows.value.value === data.length) } test("UnsafeHashedRelation") { val schema = StructType(StructField("a", IntegerType, true) :: Nil) val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) - val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data") val toUnsafe = UnsafeProjection.create(schema) val unsafeData = data.map(toUnsafe(_).copy()).toArray val buildKey = Seq(BoundReference(0, IntegerType, false)) val keyGenerator = UnsafeProjection.create(buildKey) - val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1) + val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1) assert(hashed.isInstanceOf[UnsafeHashedRelation]) assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0))) @@ -100,7 +95,6 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1))) assert(hashed2.get(toUnsafe(InternalRow(10))) === null) assert(hashed2.get(unsafeData(2)) === data2) - assert(numDataRows.value.value === data.length) val os2 = new ByteArrayOutputStream() val out2 = new ObjectOutputStream(os2) @@ -139,7 +133,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { Seq(BoundReference(0, IntegerType, false), BoundReference(1, IntegerType, true))) val rows = (0 until 100).map(i => unsafeProj(InternalRow(i, i + 1)).copy()) val keyProj = UnsafeProjection.create(Seq(BoundReference(0, IntegerType, false))) - val longRelation = LongHashedRelation(rows.iterator, SQLMetrics.nullLongMetric, keyProj, 100) + val longRelation = LongHashedRelation(rows.iterator, keyProj, 100) assert(longRelation.isInstanceOf[LongArrayRelation]) val longArrayRelation = longRelation.asInstanceOf[LongArrayRelation] (0 until 100).foreach { i => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 2260e48702..d24625a535 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -113,23 +113,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - test("Project metrics") { - // Assume the execution plan is - // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0) - val df = person.select('name) - testSparkPlanMetrics(df, 1, Map( - 0L -> ("Project", Map( - "number of rows" -> 2L))) - ) - } - test("Filter metrics") { // Assume the execution plan is // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0) val df = person.filter('age < 25) testSparkPlanMetrics(df, 1, Map( 0L -> ("Filter", Map( - "number of input rows" -> 2L, "number of output rows" -> 1L))) ) } @@ -149,10 +138,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = testData2.groupBy().count() // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> ("TungstenAggregate", Map( - "number of input rows" -> 6L, "number of output rows" -> 2L)), 0L -> ("TungstenAggregate", Map( - "number of input rows" -> 2L, "number of output rows" -> 1L))) ) @@ -160,10 +147,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df2 = testData2.groupBy('a).count() testSparkPlanMetrics(df2, 1, Map( 2L -> ("TungstenAggregate", Map( - "number of input rows" -> 6L, "number of output rows" -> 4L)), 0L -> ("TungstenAggregate", Map( - "number of input rows" -> 4L, "number of output rows" -> 3L))) ) } @@ -181,8 +166,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(df, 1, Map( 1L -> ("SortMergeJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one - "number of left rows" -> 4L, - "number of right rows" -> 2L, "number of output rows" -> 4L))) ) } @@ -201,8 +184,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(df, 1, Map( 1L -> ("SortMergeOuterJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one - "number of left rows" -> 6L, - "number of right rows" -> 2L, "number of output rows" -> 8L))) ) @@ -211,8 +192,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(df2, 1, Map( 1L -> ("SortMergeOuterJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one - "number of left rows" -> 2L, - "number of right rows" -> 6L, "number of output rows" -> 8L))) ) } @@ -226,8 +205,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = df1.join(broadcast(df2), "key") testSparkPlanMetrics(df, 2, Map( 1L -> ("BroadcastHashJoin", Map( - "number of left rows" -> 2L, - "number of right rows" -> 4L, "number of output rows" -> 2L))) ) } @@ -240,16 +217,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer") testSparkPlanMetrics(df, 2, Map( 0L -> ("BroadcastHashOuterJoin", Map( - "number of left rows" -> 3L, - "number of right rows" -> 4L, "number of output rows" -> 5L))) ) val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer") testSparkPlanMetrics(df3, 2, Map( 0L -> ("BroadcastHashOuterJoin", Map( - "number of left rows" -> 3L, - "number of right rows" -> 4L, "number of output rows" -> 6L))) ) } @@ -265,8 +238,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a") testSparkPlanMetrics(df, 3, Map( 1L -> ("BroadcastNestedLoopJoin", Map( - "number of left rows" -> 12L, // left needs to be scanned twice - "number of right rows" -> 2L, "number of output rows" -> 12L))) ) } @@ -280,8 +251,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi") testSparkPlanMetrics(df, 2, Map( 0L -> ("BroadcastLeftSemiJoinHash", Map( - "number of left rows" -> 2L, - "number of right rows" -> 4L, "number of output rows" -> 2L))) ) } @@ -295,8 +264,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = df1.join(df2, $"key" === $"key2", "leftsemi") testSparkPlanMetrics(df, 1, Map( 0L -> ("LeftSemiJoinHash", Map( - "number of left rows" -> 2L, - "number of right rows" -> 4L, "number of output rows" -> 2L))) ) } @@ -310,8 +277,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = df1.join(df2, $"key" < $"key2", "leftsemi") testSparkPlanMetrics(df, 2, Map( 0L -> ("LeftSemiJoinBNL", Map( - "number of left rows" -> 2L, - "number of right rows" -> 4L, "number of output rows" -> 2L))) ) } @@ -326,8 +291,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { "SELECT * FROM testData2 JOIN testDataForJoin") testSparkPlanMetrics(df, 1, Map( 1L -> ("CartesianProduct", Map( - "number of left rows" -> 12L, // left needs to be scanned twice - "number of right rows" -> 4L, // right is read twice "number of output rows" -> 12L))) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index a3e5243b68..d3191d3aea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { - metrics += qe.executedPlan.longMetric("numInputRows").value.value + metrics += qe.executedPlan.longMetric("numOutputRows").value.value } } sqlContext.listenerManager.register(listener) @@ -105,9 +105,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { } assert(metrics.length == 3) - assert(metrics(0) == 1) - assert(metrics(1) == 1) - assert(metrics(2) == 2) + assert(metrics(0) === 1) + assert(metrics(1) === 1) + assert(metrics(2) === 2) sqlContext.listenerManager.unregister(listener) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index eff8833e92..235b80b7c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -52,6 +53,9 @@ case class HiveTableScan( require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + override def producedAttributes: AttributeSet = outputSet ++ AttributeSet(partitionPruningPred.flatMap(_.references)) @@ -146,9 +150,13 @@ case class HiveTableScan( prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) } } + val numOutputRows = longMetric("numOutputRows") rdd.mapPartitionsInternal { iter => val proj = UnsafeProjection.create(schema) - iter.map(proj) + iter.map { r => + numOutputRows += 1 + proj(r) + } } } -- cgit v1.2.3